Giới thiệu về RxKotlin và các operator thường dùng

Tram Ho

I. Giới thiệu ngắn gọn về Reactive Programming

Thuật ngữ reactive hiện nay đã không còn xa lạ với chúng ta. Hiện nay nó đang là xu thế và gần như là rule trong lĩnh vực phát triển phần mềm. Mỗi ngày đều có rất nhiều các blogs, presentations, emerging frameworks and libraries đang nói về và phát triển xoay quanh reactive programing .

Với những bạn programer mới có thể sẽ thắc mắc về reactive programing. Tại sao mọi người lại hứng thú với nó đến vậy? Reactive programing chính xác là cái gì? Lợi ích của việc áp dụng reactive programing là gì? Chúng ta có nên học nó không? Nếu có thì sau đó như nào?
Vậy chúng ta sẽ cùng nhau tìm hiểu xem nó là gì nhé

Như các bạn đã biết, Kotlin được sinh ra giúp cải tiến và giải quyết nhiều vấn đề quan trọng của Java. Với Kotlin chúng ta có thể tạo ra 1 application tuyệt vời, và nếu bạn kết hợp với reactive programming style với Kotlin thì lại càng dễ dàng hơn trong việc tạo ra 1 application tốt hơn thế nữa. Vậy reactive programming là cái gì mà nói lắm thế?

What is reactive programming?

Reactive programming là một mô hình lập trình không đồng bộ xoay quanh các luồng dữ liệu và streams của sự thay đổi. Tức là sao, bạn hãy xem ví dụ bên dưới để hiểu rõ hơn nhé.

Bạn sẽ thấy rằng output của chương trình trên return true cho dù number = 9. Nếu thực hiện tracking isEven = isEven(number) ngay sau number = 9 thì value sẽ return false. Reactive programing cũng hoạt động tương tự. Nó sẽ thực hiện tracking khi có 1 sự thay đổi nào đó.

Ở bài viết này, mình không thể chia sẻ hết về RxKotlin được, nhưng mình sẽ chia sẻ những gì cơ bản nhất và bạn sẽ dễ gặp phải nhất thôi nhé.

II. RxKotlin

1. RxKotlin & Coroutines?

RxKotlin là một lightweight library, là extension functions cho RxJava. RxKotlin được sinh ra với mục đích là tối ưu hóa việc sử dụng và huẩn hóa các quy ước sử dụng RxJava với Kotlin.

The most exciting feature trong Kotlincoroutines. Với việc sử dụng coroutines chúng ta có thể xử lý bất đồng bộ, non-blocking code giống như threads 1 cách đơn giản hơn thread.
Coroutine còn được gọi là 1 lightweight thread.

Lưu ý rằng RxKotlin không sử dụng coroutines. Lý do khá đơn giản là cả coroutines và Schedulers trong RxKotlin đều hoạt động khá giống nhau. Trong khi coroutines mới được sinh ra, Schedulers đã có từ trước đó rất lâu cùng với RxJava, RxJs, RxSwift, …

Coroutines phù hợp với việc phát triển áp dụng concurrency khi mà chúng ta không thể sử dụng RxKotlin Schedulers.

2. Observables, Observers

Observables and subscribers are at the base of reactive programming

Chúng ta có thể nói rằng chúng là building blocks của reactive programming.

3 thành phần trụ cột của reactive programming—Observables, Observers, and subjects:

  • Chúng ta sẽ đi vào chi tiết của việc biến đổi các loại data sources thành observable instances
  • Bạn sẽ tìm hiểu về các types của Observables
  • Cách sử dụng Observer instances và subscriptions và cuối cùng

2.1. Observables

Trong reactive programming, Observable là một tính toán cơ bản mà produces các value sẽ được consumed bởi 1 comsumer (Observer).

Điều quan trọng và cần chú ý ở đây và consumer (Observer) không pull values, mà Observable pushes the value tới consumer.

Hoặc chúng ta có thể nói rằng Observable là 1 pushbased, emit các value thông qua các các operators và tới Observer:

  • Observer subscribes với Observable
  • Observable bắt đầu emit các values nó đang chứa
  • Observer phản hồi với bất kì item nào được Observable emit

How Observable works?

Observable có 3 events/methods quan trọng nhất:

  • onNext: Observable passes all items one by one tới onNext.
  • onComplete: Khi tất cả items đã đi qua onNext method, Observable gọi the onComplete method.
  • onError: Khi Observable gặp bất kì error nào, it gọi the onError method.

Chú ý là cả onError and onComplete đều là terminal events, nếu onError được gọi, thì onComplete sẽ không bao giờ được gọi, và ngược lại.

Output:

Understanding the Observable.create method

Bạn có thể tạo 1 Observable với Observable.create method bất kì lúc nào.

Observable.create method khá là hữu dụng trong trường hợp bạn làm việc với custom data structure và muốn control over values được emitted. Bạn có thể emit values tới Observer từ một thread khác.

Output:

Understanding the Observable.from methods

Observable.from methods đơn giản hơn so với the Observable.create method. Bạn có thể create Observable instances từ hầu hết Kotlin structure bằng from methods.

Chú ý rằng ở RxKotlin 1, bạn sử dụng Observale.from; tuy nhiên, từ RxKotlin 2.0 (cũng như RxJava2.0), operator được đổi tên với 1 postfix, như là fromArray, fromIterable, fromFuture, …

Understanding the Observable.just method

Observable.just cũng được sử dụng để create Observable. Tuy nhiên có hơi khác Observable.from 1 chút là:
ví dụ bạn pass 1 Iterable instance, Observable.just sẽ coi đó là single parameter, nó sẽ emit tất cả item trong list như là 1 single item, trong khi đó Observable.from sẽ pass từng item trong list.

2.2. Subscribing and disposing

Subscribe operato được sử dụng để connect 1 Observable tới Observer. Chúng tá có thể pass 1 trong 3 methods (onNext, onComplete, onError) tới subscribe operator, hoặc pass 1 instance của Observer interface tới subscribe operator để Observable connect với Observer.

Khi bạn subscribe, nếu bạn pass methods thay vì Observer instance, subscribe operator sẽ trả về 1 instance của Disposable. Nếu bạn pass instance của Observer, bạn sẽ nhận được instance của Disposable ở trong onSubscribe method.

Bạn có thể sử dụng instance của Disposable interface to để dừng các emissions bất ký lúc nào

Output:

Trên kia là những gì cơ bản nhất khi mới tiếp cận RxJava hay RxKotlin. Đến đây chúng ta cần hiểu các khái niệm Observables, Observers, Subscribingdisposing là gì đã.

3. Những operator hữu ích và hay gặp

Trước tiên, operator ở đây là gì?

Khi bắt đầu học lập trình, chúng ta học về các operators là chuỗi các ký tự được viết ra để thực hiện 1 nhiệm vụ dựa trên toán hạng và trả về kết quả.

Đối với reactive, khái niệm về operator cũng tương tự, các toán hạng ở đây như là Observable/Flowable, chúng ta biến đổi chúng và trả về kết quả là Observable/Flowable.

Có 5 loại operators:

  • Filtering/suppressing operators
  • Transforming operators
  • Reducing operators
  • Error handling operators
  • Utility operators

Tuy nhiên để đi hết thì khá là dài nên mình chỉ liệt kê một số operator mà hay gặp thôi nhé. Có thể sau này có thời gian thì mình sẽ tách chi tiết ra sau.

3.1. Filtering/suppressing operators

  • The debounce operator ()

Trì hoãn 1 khoảng thời gian sau mỗi emission, và loại bỏ emission trước đó nếu có 1 emission khác đc bắn ra trong khoảng thời gian trì hoãn và restart value trì hoãn

Output

  • The distinct operators

Operator này khá đơn giản, nó giúp bạn lọc những emission bị duplicated từ upstream

Output:

  • Filtering emissions – filter operator

Filter operator được cho là được sử dụng nhiều nhất trong nhóm filtering/suppressing operator. Nó cho phép bạn implement custom logic để lọc emissions.

Output:

Ngoài ra các operators trên bạn có thể tìm hiểu thêm distinctUntilChanged, elementAt, first, last, ignoreElements operator

3.2. Transforming operators

  • The map operator

map operator thực hiện 1 task trên từng emitted items từ upstream và emits chúng tới downstream
map operator sẽ biến đổi emitted item type T -> type R bằng việc apply lambda function của nó. Dễ hiểu hơn thì nó dùng để chuyển đối 1 item thành 1 item khác.

Output:

  • The flatMap operator

flatMap operator tạo mới 1 producer, apply function bạn pass đối với từng emission của source producer. FlatMap sẽ chuyển đổi các item phát ra bởi một Observable thành các Observable khác

Nếu sử dụng như trên thì output giống với ví dụ của map operator.

Output:

Nếu các bạn để ý thì sẽ thấy điểm khác biệt chính giữa mapflatMapFlatMap bản thân nó sẽ trả về một Observable.

Ngoài map và flatMap ra thì các bạn có thể tìm hiểu thêm cast, defaultIfEmpty, switchIfEmpty, startWith, sorted, scan operator.

3.3. Reducing operators

  • Counting emissions (count operator)

count operator subscribes với producer, đếm the emissions, và emits 1 Single chứa số lượng của emissions được emit bởi producer.

Output:

  • Accumulating emissions – reduce operator

Reduce là một accumulating operator khá thú vị. Nó cộng dồn tất cả các emissions được emit từ producer.

Output:

3.4. Error handling operators

Chúng ta đều biết về onError event ở Subscriber/Observer. Tuy nhiên, vấn đề là onError event chỉ emitted tới downstream consumer và subscription ngay lập tức vị chấm dứt. Vì vậy, trong trường hợp chúng ta muốn handle error khi value đang được emit ở upstream thì sao? Thật may mắn là chúng ta có các handling operators.

  • onErrorReturn – return a default value on error

onErrorReturn sẽ return 1 default value tới downstream khi một error occurred in the upstream.

Output:

  • The onErrorResumeNext operator

onErrorResumeNext operator giúp bạn subscribe với một producer khác khi có error xảy ra.

Output:

  • Retrying on error

retry operator cũng là một error handling operator khác cho phép bạn retry/resubscribe tới producer khi 1 error xảy ra. Bạn chỉ cần cung cấp số lần retry và điều kiện retry.

Output:

3.5. Utility operators

Dưới đây là list utility operators:

  • doOnNext, doOnComplete, and doOnError
  • doOnSubscribe, doOnDispose, and doOnSuccess
  • serialize
  • cache

Chúng sẽ giúp chúng ta thực hiện đa dạng các utility operations như là thực hiện action trên các emission (doOnNext), ghi nhớ timestamps, caching, …

4. Schedulers

  • What is a scheduler?
    Nói là RxKotlin giúp xử lý bất đồng bộ, từ nãy giờ chưa thấy bất đồng bộ ở đâu nhỉ. Scheduler chính là cái giúp chúng ta làm điều đó.
    Scheduler có thể được hiểu giống như thread pool. ReactiveX có thể pool 1 thread và execute task trên thread đó.
  • Types of scheduler
    • Schedulers.io(): Khi dùng cái này thì sẽ không dùng đến CPU, nó thực hiện các công việc chuyên sâu như networks call, đọc đĩa/file, database, … Nó duy trì được pool của thread.
    • Schedulers.computation(): Có thể đòi hỏi đến đòi hỏi nhiều CPU như xử lý dữ liệu lớn, xử lý bitmap, … Số lượng các thread được tạo ra bằng cách sử dụng Scheduler này hoàn toàn phụ thuộc vào số lõi CPU.
    • Schedulers.newThread(): Sử dụng cái này thì mỗi thread sẽ được tạo ra mỗi lần nhiệm vụ được xếp lịch. Thường thì không khuyến cáo sử dụng cách này trừ khi công việc rất dài. Thread được tạo qua newThread() sẽ không được dùng lại.
    • Schedulers.single(): Scheduler này sẽ thực hiện tất cả các nhiệm vụ theo thứ tự tuần tự mà chúc được add vào. Việc này có thể cần thiết trong một số trường hợp cần tuần tự.
    • Schedulers.trampoline(): Nó thực hiện các nhiệm vụ theo Last In – First Out. Tất cả các nhiệm vụ được xếp lịch sẽ được thực hiện từng cái một bằng cách giới hạn số lượng các background thread thành một.
    • Schedulers.from(): Cách này cho phép tạo ra một Scheduler từ một Executor bởi giới hạn số lượng các thread được tạo ra. Khi thread pool bị full, các nhiệm vụ sẽ xếp hàng đợi.

Output:

Nội dung bài viết cũng tương đối dài rồi nên mình xin dừng lại tại đây. Mình mong là qua bài này mọi người có cái nhìn tổng quan về RxKotlin. Cảm ơn các bạn đã đọc bài viết.

Chia sẻ bài viết ngay

Nguồn bài viết : Viblo