Introduction to RxKotlin and commonly used operators

Tram Ho

I. Brief Introduction to Reactive Programming

The term reactive today is no longer unfamiliar to us. Currently it is a trend and almost a rule in the field of software development. Every day there are many blogs, run, emerging frameworks and libraries are talking about and development around reactive programing .

New programers may have questions about reactive programing . Why are people so interested in it? What exactly is Reactive Programing? What are the benefits of reactive programing? Should we learn it? If so, then how? So we will together find out what it is offline

As you all know, Kotlin was born to improve and solve many of the most important Java . With Kotlin we can make a great application, and if you combine reactive programming style with Kotlin it is even easier to create a better application. So what is reactive programming that says so much?

What is reactive programming?

Reactive programming is an asynchronous programming model that revolves around data streams and streams of change. That is why, please see the example below to understand better.

You will see that the output of the above program returns true even though number = 9 . If tracking isEven = isEven(number) right after number = 9 , the value will return false. Reactive programing works similarly. It will perform tracking when there is a change.

In this article, I cannot share all about RxKotlin, but I will share the most basic things and you will encounter the most easily.

II. RxKotlin

1. RxKotlin & Coroutines?

RxKotlin is a lightweight library, which is an extension functions for RxJava. RxKotlin was born with the aim of optimizing the usage and standardizing RxJava usage conventions with Kotlin.

The most exciting feature in Kotlin are coroutines . With the use of coroutines we can handle asynchronous, non-blocking code like threads in a simpler way than thread. Coroutine is also called a lightweight thread.

Note that RxKotlin does not use coroutines. The reason is quite simple is that both coroutines and schedulers in RxKotlin work quite similarly. While the new coroutines were born, Schedulers existed a long time ago along with RxJava, RxJs, RxSwift, …

Coroutines are suitable for the development of applying concurrency when we cannot use RxKotlin Schedulers.

2. Observables, Observers

Observables and subscribers are at the base of reactive programming

We can say that they are building blocks of reactive programming.

The three pillars of reactive programming — Observables, Observers, and subjects:

  • We’ll go into details of converting data sources into observable instances
  • You will learn about the types of Observables
  • How to use Observer instances and subscriptions and finally

2.1. Observables

In reactive programming, Observable is a basic calculation that produces the values ​​that will be consumed by 1 comsumer (Observer).

It’s important to note here and consumers (Observer) don’t pull values , but Observable pushes the value to consumers.

Or we can say that Observable is pushbased, emit the values ​​through operators and goes to the Observer:

  • Observer subscribes with Observable
  • Observable starts emit the values ​​it contains
  • The Observer responds to any items observed by Observable emit

How Observable works?

Observable has 3 most important events / methods:

  • onNext : Observable passes all items one by one to onNext.
  • onComplete : When all items have passed through the onNext method, Observable calls the onComplete method.
  • onError : When Observable encounters any error, it calls the onError method.

Note that both onError and onComplete are terminal events, if onError is called, onComplete will never be called, and vice versa.

Output:

Understanding the Observable.create method

You can create an Observable with Observable.create method at any time.

Observable.create method is quite useful in case you are working with a custom data structure and want to control over values ​​emitted. You can emit values ​​to the Observer from another thread.

Output:

Understanding the Observable.from methods

Observable.from methods are simpler than the Observable.create method. You can create Observable instances from most Kotlin structure using from methods.

Notice that in RxKotlin 1, you use Observale.from ; however, from RxKotlin 2.0 (as well as RxJava2.0), the operator is renamed with a postfix, such as fromArray, fromIterable, fromFuture, …

Understanding the Observable.just method

Observable.just is also used to create Observable. However, there is a little bit different from Observable.from: for example you pass 1 Iterable instance, Observable.just will treat it as a single parameter, it will emit all items in the list as a single item, while Observable.from will pass each item in the list.

2.2. Subscribing and disposing

Subscribe operato is used to connect 1 Observable to Observer. We can either pass 1 of 3 methods (onNext, onComplete, onError) to the subscribe operator, or pass an instance of the Observer interface to the subscribe operator so that Observable connects to the Observer.

When you subscribe, if you pass methods instead of the Observer instance, the subscribe operator returns an instance of Disposable. If you pass an Observer instance, you should obtain an instance of Disposable in the onSubscribe method.

You can use an instance of Disposable interface to to stop emissions at any time

Output:

Above is what is the most basic when approaching RxJava or RxKotlin. At this point we need to understand what the concepts Observables , Observers , Subscribing and disposing are.

3. Useful and common operators

First, what is operator here?

When starting to learn programming, we learn about operators which are strings of letters written down to perform a task based on operands and return results.

For reactive, the concept of operator is similar, the operands here are Observable / Flowable, we transform them and return the result as Observable / Flowable.

There are 5 types of operators:

  • Filtering / suppressing operators
  • Transforming operators
  • Reducing operators
  • Error handling operators
  • Utility operators

However, it is quite long to go all the way, so I just listed a few operators that I often meet. Maybe after some time I will separate the details later.

3.1. Filtering / suppressing operators

  • The debounce operator ()

Delay 1 period after each emission, and eliminate the previous emission if there is another emission is fired within a time delay and restart value delay

Output

  • The distinct operators

This operator is quite simple, it helps you to filter duplicated emission from the upstream

Output:

  • Filtering emissions – filter operator

The filter operator is arguably the most used in the filtering / suppressing operator group. It allows you to implement custom logic to filter emissions.

Output:

Also the operators above you can learn more about distinctUntilChanged , elementAt , first , last , ignoreElements operator

3.2. Transforming operators

  • The map operator

The map operator performs a task on each emitted items from upstream and emits them to the downstream map operator that converts emitted item type T -> type R by applying its lambda function. More understandably, it is used to convert one item into another.

Output:

  • The flatMap operator

The flatMap operator creates a new producer, applies the function you pass to each emission of the source producer. FlatMap converts items emitted by one Observable into other Observable

If used as above, the output is same as the example of map operator.

Output:

If you pay attention, you will see the main difference between map and flatMap is that FlatMap itself returns an Observable .

In addition to map and flatMap, you can learn more cast , defaultIfEmpty , switchIfEmpty , startWith , sorted , scan operator.

3.3. Reducing operators

  • Counting emissions (count operator)

count operator subscribes with producer, counts the emissions, and emits 1 Single contains the amount of emissions being emit by producer.

Output:

  • Accumulating emissions – reduce operator

Reduce is a quite interesting accumulating operator. It accumulates all emissions being emit from the producer.

Output:

3.4. Error handling operators

We all know about the onError event in Subscriber / Observer. The problem, however, is that the onError event only emitted to the downstream consumer and subscription is immediately terminated. So, what if we want to handle error when value is being emit at upstream? Fortunately we have the handling operators.

  • onErrorReturn – return a default value on error

onErrorReturn will return 1 default value to downstream when an error occurred in the upstream.

Output:

  • The onErrorResumeNext operator

The onErrorResumeNext operator lets you subscribe to another producer when an error occurs.

Output:

  • Retrying on error

The retry operator is also another error handling operator that allows you to retry / resubscribe to the producer when an error occurs. You only need to provide the number of retry times and the retry condition.

Output:

3.5. Utility operators

Below are the list utility operators:

  • doOnNext, doOnComplete, and doOnError
  • doOnSubscribe, doOnDispose, and doOnSuccess
  • serialize
  • cache

They will help us perform a variety of utility operations such as action on emission (doOnNext), recording timestamps, caching, etc.

4. Schedulers

  • What is a scheduler? Say that RxKotlin helps with asynchronous handling, since so far I haven’t seen any asynchronous. The scheduler is what helps us do that. Scheduler can be understood like thread pool. ReactiveX can pool 1 thread and execute task on that thread.
  • Types of scheduler
    • Schedulers.io (): When using this, it will not use the CPU, it performs specialized tasks such as networks calls, reading disks / files, database, … It maintains the thread pool.
    • Schedulers.computation (): May require a lot of CPU such as large data processing, bitmap processing, … The number of threads created using this scheduler is entirely dependent on the number of CPU cores.
    • Schedulers.newThread (): Using this, each thread will be created each time a task is scheduled. It is generally not recommended to use this unless the work is very long. Threads created via newThread () will not be reused.
    • Schedulers.single (): This scheduler will perform all tasks in sequence that they wish to be added. This may be necessary in some cases where sequencing is required.
    • Schedulers.trampoline (): It performs tasks according to Last In – First Out. All scheduled tasks will be executed one by one by limiting the number of background threads to one.
    • Schedulers.from (): This allows creating a scheduler from an Executor by limiting the number of threads that can be created. When the thread pool becomes full, tasks queue.

Output:

The article content is also quite long so I would like to stop here. I hope that through this post everyone has an overview of RxKotlin. Thanks for reading my writing.

Share the news now

Source : Viblo