In this article, let’s learn the basics of RxJava Disposables.
Reactive Android Programming includes RxJava, RxAndroid and RxKotlin.
RxJava is the most used Reactive Programming library in the Android Development world. It supports better multithreading management and makes the code more intuitive.
RxAndroid is a lightweight module, linking specific Android components to the RxJava classes. RxKotlin makes writing RxJava methods easier by providing convenient extension functions.
Disposables
What is disposables ? Disposables in English implies short-term convenience. This also means that they exist for a short time or are removed after use. The same idea was conveyed in RxJava’s Disposables.
When an Observer subscribes to an Emitter or Observables , you create a stream. This stream takes up resources (resources) which later becomes disposable “solid waste”. You need to handle it or the stream will run for a long time.
Observable
has a method called onComplete()
will do the processing for you when called. However, many times, you will find it more beneficial and more convenient to be able to cancel your subscription easily and anytime.
Now, you will solve a more complex case. Streams will run endlessly and you will use Disposables
to handle them to avoid memory leaks .
Disposable
is a stream or a link between an Observable
and an Observer
. A quick check of the document shows that it has two main methods, dispose()
and isDisposed()
. The former handles the link, while the latter checks if the link has been processed.
Testing Disposables
When you set up the registry between Observable and Observer, it returns a Disposable. For example, see the following code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 | import android.util.Log import io.reactivex.Observable import java.util.concurrent.TimeUnit object DisposableTester { @JvmStatic fun main() { val seconds = Observable.interval(1, TimeUnit.SECONDS) val disposable = seconds.subscribe({ l -> logData(l) }) //sleep 10 seconds sleep(10000) //Dispose and stop emissions disposable.dispose() Log.d("Test","Disposed!") //Sleep 10 seconds to prove //There are no more emissions sleep(10000) } private fun logData(l: Long) { Log.d("Test","Received: " + l) } private fun sleep(millis:Int) { try { Thread.sleep(millis.toLong()) } catch (e:InterruptedException) { e.printStackTrace() } } } |
An Observable
runs and emits every second. After ten seconds of release, the Disposable
resource returned from subscribe()
is processed by explicitly calling dispose()
. Then another ten-second timer emits to verify that the resource has been processed.
Disposal of external Disposables
is a way to dispose of resources no longer needed. Since RxJava 2.0 , Observer
has the ability to remove registrations at any time. For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 | import io.reactivex.Observer import io.reactivex.disposables.Disposable object DisposableTester { var myObserver: Observer<Int> = object: Observer<Int> { private var disposable: Disposable? = null override fun onSubscribe(disposable: Disposable) { this.disposable = disposable } override fun onNext(value:Int) { //Has access to Disposable } override fun onError(e:Throwable) { //Has access to Disposable } override fun onComplete() { //Has access to Disposable } } } |
If at any time emissions are no longer needed in onNext()
, onError()
or onComplete()
, you can stop them.
CompositeDisposables
When developing an application, there are scenarios where you need more than one subscription. Getting data directly from multiple sources in a travel app for hotels, tours and airline tickets is a great example.
You need to use CompositeDisposables to manage resources. It implements Disposable and then keeps a collection of disposables. See the following example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 | import android.util.Log import io.reactivex.Observable import io.reactivex.disposables.CompositeDisposable import java.util.concurrent.TimeUnit object DisposableTester { private val disposables = CompositeDisposable() @JvmStatic fun main() { val seconds = Observable.interval(1, TimeUnit.SECONDS) //Subscribe and capture disposables val disposable1 = seconds.subscribe({ l -> logData(l, 1) }) val disposable2 = seconds.subscribe({ l -> logData(l, 2) }) //Put both disposables into CompositeDisposable disposables.addAll(disposable1, disposable2) //Sleep 10 seconds sleep(10000) //Dispose all disposables disposables.dispose() Log.d("Test", ("All Disposed!")) //Sleep 10 seconds to prove //there are no more emissions sleep(10000) } private fun logData(l: Long, observerNumber: Int) { Log.d("Test", ("Observer " + observerNumber + ": " + l)) } private fun sleep(millis:Int) { try { Thread.sleep(millis.toLong()) } catch (e:InterruptedException) { e.printStackTrace() } } } |
In this deployment, the code used on this simple utility helps you manage a collection of Disposables
. By calling add()
or addAll()
, you can remove them all at once when they are no longer needed.
Congratulations! Through this you have refreshed your knowledge of the basics of Reactive Programming and learned about Disposables.
ref: https://www.raywenderlich.com/3983802-working-with-rxjava-disposables-in-kotlin#toc-anchor-001