Hello friends. I would like to continue with the series of articles to learn Rx in Android programming, specifically here RxJava. Today I would like to introduce in more detail about 2 important components, almost the core in RxJava, which are Observable and Observer.
1. Observable
Observable in RxJava is an important component for handling data streams in Android application development. Observable represents a data stream that can emit events or data values over time.
In RxJava, you can create an Observable from various data sources such as lists, collections, UI events, database query results, network API calls, and many more.
We will have the following 5 types of Observables:
- Observable
- Single
- Maybe
- Flowable
- Completable
To create Observable in RxJava you can use methods like:
- Observable.create() : Creates an Observable from custom logic code. You can use Observer methods to emit events or data values.
- Observable.just() : Creates an Observable from one or more specified values. This Observable will emit these values and complete afterwards.
- Observable.interval() : Creates an Observable that emits continuous integers after a certain period of time.
- Observable.fromIterable() : Creates an Observable from a list, a set, or an iterable.
- Observable.fromCallable() : Creates an Observable from a Callable where you can perform asynchronous operations and return a value.
Once you have created the Observable, you can use operators to transform, filter, and process the data in the Observable according to your needs. You can then subscribe to an Observer with the Observable to receive and handle events and values emitted from the Observable.
Observables in RxJava allow you to dynamically process data, perform asynchronous tasks, and interact with other Android components in Android application development.
2. Observer
Observer in RxJava is an important component for receiving and handling events or values from an Observable in Android application development. Observer subscribes to an Observable to receive notifications about events and values emitted from that Observable.
We will have the following 5 types of Observers:
- Observer
- SingleObserver
- MaybeObserver
- CompletableObserver
In RxJava, you can create an Observer by implementing the Observer<T> object. This object defines the methods that you need to implement to handle events and values from the Observable.
The main methods in the Observer interface include:
- onNext(T value) : This method is called when a new value is emitted from the Observable. You can define actions to handle when this value is received.
- onError(Throwable throwable) : This method is called when an error occurs while emitting value from Observable. You can handle and report errors in this method.
- onComplete() : This method is called when the Observable finishes emitting values. You can perform cleanup or final disposal actions in this method.
Once you have implemented the Observer interface, you can register the Observer with an Observable using the subscribe() method on the Observable. Upon successful registration, the Observer will receive events and values from the Observable and perform corresponding handler actions.
For example:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 | val observable: Observable<String> = Observable.just("Android", "RxJava", "RxAndroid"); val observer: Observer<String> = object : Observer<String> { override fun onSubscribe(d: Disposable) {} override fun onError(e: Throwable) { // Handle when an error occurs during value generation } override fun onComplete() { // Handle when Observable finishes emitting value } override fun onNext(t: String) { // Handle value which receive from Observable } }; observable.subscribe(observer) |
Above is how to use Observer in RxJava in Android application development. Observer helps you to get and handle events and values from Observable dynamically and easily.
Here I will talk about the combination and give examples for each type of Observable and Observer together.
3. Observable/ Observer types and implementations
As we mentioned above there are 5 types of Observable and 4 types of Observer. The following table describes the correspondence between Observable and Observer as well as the number of emissions for each type
Observable | Observer | Nums of emissions |
---|---|---|
Observable | Observer | Multiple or None |
Single | SingleObserver | One |
Maybe | SingleObserver | One or None |
Flowable | Observer | Multiple or None |
Completable | CompletableObserver | None |
3.1. Observable & Observer
Observable is a fairly commonly used type. It can emit one or more items. I will implement an illustrative example below:
First, we’ll create an Observable:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | val observableList = arrayListOf("RxJava", "RxAndroid", "Coroutine") val observable: Observable<String> = Observable.create { emitter -> // emit each item for (item in observableList) { Log.i("PhongPN3", "emitter: $item - ${Thread.currentThread().name}") emitter.onNext(item) } // all items are emitted emitter.onComplete() } |
We use the onNext() function to emit each item. When the emission is complete, we will use the onComplete() function. Next step we define Observer to handle emitted items.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | val observer: Observer<String> = object : Observer<String> { override fun onSubscribe(d: Disposable) { Log.i("PhongPN3", "onSubscribe - ${Thread.currentThread().name}") } override fun onNext(t: String) { Log.i("PhongPN3", "onNext: $t - ${Thread.currentThread().name}") } override fun onError(e: Throwable) { Log.i("PhongPN3", "onError: ${e.message} - ${Thread.currentThread().name}") } override fun onComplete() { Log.i("PhongPN3", "onComplete - ${Thread.currentThread().name}") } } |
Finally, subscribe to listen for data from an Observable.
1 2 | observable.subscribe(observer) |
The result will be:
1 2 3 4 5 6 7 8 9 | onSubscribe - main emitter: RxJava - main onNext: RxJava - main emitter: RxAndroid - main onNext: RxAndroid - main emitter: Coroutine - main onNext: Coroutine - main onComplete - main |
3.2. Single & SingleObserver
Single always emits a single item or throws an exception.
1 2 3 4 5 | val s = "RxJava" val singleObservable: Single<String> = Single.create { emitter -> emitter.onSuccess(s) } |
SingleObserver will also be different from normal Observer, specifically it will not have onNext() and onComple() functions, instead will do onSuccess() function.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | val singleObserver: SingleObserver<String> = object : SingleObserver<String> { override fun onSubscribe(d: Disposable) { Log.i("PhongPN3", "onSubscribe - ${Thread.currentThread().name}") } override fun onError(e: Throwable) { Log.i("PhongPN3", "onError: ${e.message} - ${Thread.currentThread().name}") } override fun onSuccess(t: String) { Log.i("PhongPN3", "onSuccess: $t - ${Thread.currentThread().name}") } } |
Finally, subscribe to listen for data from an Observable.
1 2 | singleObservable.subscribe(singleObserver) |
The result will be:
1 2 3 | onSubscribe - main onSuccess: RxJava - main |
3.3. Maybe & MaybeObserver
Maybe is an Observable that can deliver 1 item or not at all (with 1 or nothing). With Maybe we will use it for the case where the value we want to receive is a variable that may or may not be present. For example we query note by Id in the database it may or may not be.
1 2 3 4 5 | val s = "RxJava" val maybeObservable = Maybe.create { emitter: MaybeEmitter<String> -> emitter.onSuccess(s) } |
If we want to emit the item, we will use onSuccess, and if we don’t want to emit the item, we will use onComplete. This is the main difference with Single observable.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | val maybeObserver: MaybeObserver<String> = object : MaybeObserver<String> { override fun onSubscribe(d: Disposable) { Log.i("PhongPN3", "onSubscribe - ${Thread.currentThread().name}") } override fun onError(e: Throwable) { Log.i("PhongPN3", "onError: ${e.message} - ${Thread.currentThread().name}") } override fun onSuccess(t: String) { Log.i("PhongPN3", "onSuccess: $t - ${Thread.currentThread().name}") } override fun onComplete() { Log.i("PhongPN3", "onComplete - ${Thread.currentThread().name}") } } |
Finally, subscribe to listen for data from an Observable.
1 2 | maybeObservable.subscribe(maybeObserver) |
The result will be:
1 2 3 | onSubscribe - main onSuccess: RxJava - main |
3.4. Completable & CompletableObserver
Completable is an Observable type that will not emit any items, it just executes a certain task and reports the task as complete or unfinished.
Initialize Observable:
1 2 3 4 5 | val completableObservable = Completable.create { emitter: CompletableEmitter -> // do something emitter.onComplete() } |
Observer definition:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | val completeObserver: CompletableObserver = object : CompletableObserver { override fun onSubscribe(d: Disposable) { Log.i("PhongPN3", "onSubscribe - ${Thread.currentThread().name}") } override fun onError(e: Throwable) { Log.i("PhongPN3", "onError: ${e.message} - ${Thread.currentThread().name}") } override fun onComplete() { Log.i("PhongPN3", "onComplete - ${Thread.currentThread().name}") } } |
Finally, subscribe to listen for data from the Observable.
1 2 | completableObservable.subscribe(completeObserver) |
The result will be:
1 2 3 | onSubscribe - main onComplete - main |
3.5. Flowable & SingleObsever
Used when an Observable generates a large amount of events/data that the Observer can handle. Flowable can be used when the source generates a lot of events (according to many docs about 10k+ events) and Onserver can’t consume them all. Flowable uses Backpressure method to handle data avoiding MissingBackpressureException and OutOfMemoryError errors.
In this example, we will sum from 1 to 10, and the result will be reported to a SingleObserver.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 | val flowable = Flowable.range(1, 10) val singleObserver: SingleObserver<Int> = object : SingleObserver<Int> { override fun onSubscribe(d: Disposable) { Log.i("PhongPN3", "onSubscribe - ${Thread.currentThread().name}") } override fun onError(e: Throwable) { Log.i("PhongPN3", "onError: ${e.message} - ${Thread.currentThread().name}") } override fun onSuccess(t: Int) { Log.i("PhongPN3", "onSuccess: $t - ${Thread.currentThread().name}") } } flowable.reduce(0) { sum: Int, item: Int -> sum + item }.subscribe(singleObserver) |
The reduce function processes each item that the flowable emits and returns a value that is the sum of all items.
The result will be:
1 2 3 | onSubscribe - main onSuccess: 55 - main |
Note : In the source code examples for reference, I often leave the Log so that you can conveniently try to run and output the same output as I presented.
summary
Above are the types and implementations of the Observable and Observer types respectively. I hope the article helps people understand and understand the most basic usage of these two RxJava components.
The next article I will continue with Operators in RxJava. See you all in the next post.