Tìm hiểu về Functional Reactive Programing và RxJS

Tram Ho

Giới thiệu

Khi làm việc với Angular nói riêng cũng như xử lý bất đồng bộ (ASYNC) trong Javascript nói chung, hẳn bạn đã từng nghe tới khái niệm Fucntional Reactive Programing, ObservableRxJS. Vậy Fucntional Reactive Programing là gì ?. Observable là gì ?. RxJS là gì ?. Chúng có quan hệ gì với Angular, làm thế nào để sử dụng chúng hiệu quả trong ứng dụng của bạn .Trong bài viết này chúng ta sẽ đi tìm hiểu về chúng để cùng có cái nhìn tổng quát, cũng như có những ví dụ cụ thể về chúng. Bắt đầu thôi nào !!!

1. Reactive Programing

Trong các ứng dụng web và mobile, người dùng có thể tương tác với dữ liệu thông qua UI (User Interface). 10 năm trước, việc này với các trang web đơn thuần là việc submit 1 form lên server backend rồi dựng 1 trang giao diện cho frontend. Chẳng vui vẻ gì với việc cứ ngồi bấm F5 mãi xem nội dung đã được cập nhật chưa. Ngày nay, cả nhu cầu của người dùng lẫn ứng dụng đều phát triển theo hướng real-time: Việc thay đổi 1 form data, like 1 bài viết hay kết bạn,… đều ngay lập tức được phản ánh theo thời gian thực (real-time), điều này mang lại trải nghiệm người dùng tuyệt vời hơn bao giờ hết. Đỉnh cao của Real-time chắc hẳn là những ứng dụng mạng xã hội như Facebook, Twitter,…

Đó là về phía user, còn về phía những developers, việc xử lý lý những sự kiện real-time này chúng ta cần đến Reactive Programing. Reactive Programing không phải con đường duy nhất nhưng nó là cách tốt nhất hiện nay để tiếp cận với ứng dụng real-time. Và mặc dù Reactive Program không chỉ là để lập trình real-time program mà còn nhiều ứng dụng khác nữa ví dụ như protocols, system drivers.

Hiện tại, có cả tá định nghĩa về Reactive Programing, nhưng mình thấy định nghĩa sau đây là bao quát tốt vấn đề Reactive programming is programming with asynchronous data streams. Là phương pháp lập trình xoay quanh data streams và nó deal với các vấn đề của asynchronous.

Bạn có thể tưởng tượng data streams như hình sau, với data được gửi đến trong suốt dòng thời gian của một stream (over time), giống như một array có các phần tử được gửi đến lần lượt theo thời gian. Và chúng ta có thể coi mọi thứ là stream: single value, array, event, ….

Không những thế, khi thao tác với stream, chúng ta có thể có value, error, hay complete signals. Đây là điều mà các API trước đây của các hệ thống event trong Javascript còn thiếu, chúng có qua nhiều interface khác nhau cho các loại event khác nhau, Observable sinh ra để tổng quát hóa các interface đó lại.

2. RxJS

RxJS là một thư viện cho chương trình soạn bất đồng bộ và dựa trên sự kiện sử dụng trình tự có thể quan sát (observable). Nó cung cấp 1 kiểu lõi ( Observable), các kiểu vệ tinh (Observer, Schedulers, Subjects) và toán tử lấy cảm hứng từ mảng (map, filter, reducer, every,…) cho phép thao tác các sự kiện bất đồng bộ như tập hợp (collections) .

Hãy nghĩ về RxJS giống như Lodash cho các sự kiện.

Các khái niệm cơ bản trong RxJS giải quyết sự quản lý sự kiện không đồng bộ là:

  1. Observable: đại diện cho khái niệm về một tập hợp các giá trị hoặc các sự kiện trong tương lai. Khi các giá trị hoặc sự kiện phát sinh trong tương lai, Observable sẽ điều phối nó đến Observer.
  2. Observer: là một tập hợp các callbacks tương ứng cho việc lắng nghe các giá trị (next, error, hay complete) được gửi đến bởi Observable.
  3. Subscription: là kết quả có được sau khi thực hiện một Observable, nó thường dùng cho việc hủy việc tiếp tục xử lý.
  4. Operators: là các pure functions cho phép lập trình functional với Observable.
  5. Subject: để thực hiện việc gửi dữ liệu đến nhiều Observers (multicasting).
  6. Schedulers: một scheduler sẽ điều khiển khi nào một subscription bắt đầu thực thi, và khi nào sẽ gửi tín hiệu đi. (Trong bài này chúng ta sẽ không nói về phần này).

Ví dụ:

Bình thường bạn đăng ký lắng nghe 1 sự kiện:

Sử dụng RxJS bạn tạo ra 1 Observable thay thế như đoạn mã bên dưới:

3. Array Trong Javascript

Trước khi bắt đầu với Observable, chúng ta sẽ ôn lại một số kiến thức về Array sẽ giúp ích trong việc tiếp cận Observable.

3.1 Array forEach

Array forEach là một trong các cách để ta có thể lặp qua lần lượt từng phần tử trong mảng.

Kết quả nhận được của chúng ta như sau:

Ngoại trừ một điểm chúng ta cần lưu ý khi các phần tử là kiểu reference thay vì kiểu primitive, thì forEach có thể khiến các phần tử của array ban đầu thay đổi giá trị.

3.2 Array map

Array map cho phép chúng ta lặp qua tất cả các phần tử trong mảng, áp dụng một function nào đó lên các phần tử để biến đổi, sau đó trả về một mảng các giá trị sau khi thực hiện function.

3.3 Array filter

Array filter cho phép chúng ta lặp qua tất cả các phần tử trong mảng, áp dụng một function nào đó lên các phần tử để kiểm tra, sau đó trả về một mảng các giá trị sau khi thực hiện hàm kiểm tra mà thỏa mãn điều kiện (return true) và giữ nguyên mảng cũ không bị ảnh hưởng.

3.4 Array reduce

Method reduce cho phép chúng ta lặp qua tất cả các phần tử và áp dụng một function nào đó vào mỗi phần tử, function này có các tham số:

  • accumulator: giá trị trả về từ các lần call callback trước.
  • currentValue: giá trị của phần tử hiện tại trong array.
  • currentIndex: index của phần tử hiện tại.
  • array: chính là mảng hiện tại.

Ngoài ra, chúng ta còn có thể cung cấp giá trị ban đầu initialValue sau tham số function đầu tiên.

3.5 Flatten Array

Trong nhiều tình huống, chúng ta có các array, bên trong mỗi phần tử có thể là các array khác, lúc này chúng ta có nhiệm vụ làm giảm số chiều (flatten) đi chẳng hạn, chúng ta có thể có đoạn code xử lý sau trong Javascript.

Như ở ví dụ trên, chúng ta flat mảng con 2 chiều thành 1 chiều, và chúng ta có thể flat nhiều lần để mỗi lần sẽ giảm đi 1 chiều.

Điều này các bạn sẽ hay gặp khi làm việc với Observable trả về Observable trong các phần tiếp theo.

4. Producer vs Consumer, Push vs Pull

Producer: là nguồn sản sinh ra data.

Consumer: là nơi chế biến data sản sinh từ Producer.

Pull systems: Consumer sẽ quyết định khi nào lấy data từ Producer. Producer không quan tâm khi nào data sẽ được gửi đến cho Consumer. Các function trong Javascript là một Pull system. Khi nào lời gọi hàm thì khi đó mới xử lý. Gọi n lần thì xử lý n lần.

Lưu ý: function chỉ trả về 1 giá trị sau khi lời gọi hàm được thực hiện. (một mảng cũng chỉ coi là 1 giá trị, vì nó được trả về 1 lần).

Push systems: Producer sẽ quyết định khi nào gửi dữ liệu cho Consumer. Consumer không quan tâm khi nào nhận được data. Promise, DOM events là các Push systems. Chúng ta register các callbacks và khi event phát sinh, các callbacks sẽ được gọi với dữ liệu từ Producer truyền vào.

Ví dụ:

Pull

Push

5. Observable

Observable chỉ là một function (class) mà nó có một số yêu cầu đặc biệt. Nó nhận đầu vào là một Function, mà Function này nhận đầu vào là một Observer và trả về một function để có thể thực hiện việc cancel quá trình xử lý. Thông thường (Rxjs 5) chúng ta đặt tên function đó là unsubscribe.

Observer: một object có chứa các phương thức next, error và complete để xử lý dữ liệu tương ứng với các signals được gửi từ Observable.

Như vậy, chúng ta có thể thấy Observable là lazy computation, giống như function, nếu chúng ta tạo chúng ra mà không gọi, thì không có gì thực thi cả.

Tạm thời bỏ qua các chi tiết về hàm create dưới đây, chúng ta sẽ gặp lại nó khi tìm hiểu về Operator:

Đến đây, nếu chúng ta không gọi hàm getDetail, hoặc không invoke đến observable thì chẳng có gì xảy ra cả.

Để thực thi, chúng ta sẽ làm như sau:

Và sau đây là kết quả chúng ta nhận được:

Observable có thể deal với cả sync và async.

6. Làm Quen Với Observable

Với Observable chúng ta sẽ quan tâm đến các thao tác như sau:

  • Creating Observables
  • Subscribing to Observables
  • Executing the Observable
  • Disposing Observables

6.1 Creating Observables

Rx.Observable.create là một operator, nó chỉ là một alias cho Observable constructor, chúng ta hoàn toàn có thể thay thế tương ứng bằng cách gọi constructor cũng cho kết quả tương tự.

Đầu vào của constructor yêu cầu một hàm gọi là subscribe mà hàm này có đầu vào là một observer object.

Bạn hoàn toàn có thể sử dụng new hoặc Rx.Observable.create.

Ngoài operator như create, Rxjs mang đến cho bạn nhiều lựa chọn khác nhau để tạo mới một Observable như các operators: of, from, interval, etc. Chúng được đặt trong nhóm creation operators.

Ví dụ, bạn muốn tạo Observable cho một mảng các giá trị, lúc này bạn không cần dùng Rx.Observable.create rồi lặp qua các phần tử, xong gọi next nữa. Rxjs có cách dùng khác, vì đây là một usecase rất hay dùng và create là một low-level API.

6.2 Subscribing to Observables

Sau khi đã tạo xong một Observable, chúng ta cần invoke bằng cách subscribe vào như sau:

Vậy nên chúng ta call một function n lần, chúng ta sẽ có n lần thực thi. Tương tự như thế, khi chúng ta subscribe vào một Observable m lần, thì có m lần thực thi, một lời gọi subscribe giống như một cách để Observable bắt đầu thực thi.

6.3 Executing the Observable

Phần code khi chúng ta khởi tạo Observable Rx.Observable.create(function subscribe(observer) {…}) chính là “Observable execution”. Giống như khai báo một function, phần code này để thực hiện một số hành động, xử lý nào đó; chúng là lazy computation – chỉ thực thi khi Observer thực hiện subscribe.

Có ba kiểu giá trị mà một Observable Execution có thể gửi đi:

  • “Next” notification: gửi đi một giá trị, có thể là bất kỳ kiểu dữ liệu nào như Number, a String, an Object, etc.
  • “Error” notification: gửi đi một JavaScript Error hoặc exception.
  • “Complete” notification: không gửi đi một giá trị nào, nhưng nó gửi đi một tín hiệu để báo rằng stream này đã completed, mục đích để Observer có thể thực hiện một hành động nào đó khi stream completed.

Next notifications thường được sử dụng rộng rãi, nó cực kỳ quan trọng, vì nó gửi đi dữ liệu cần thiết cho một Observer.

Error và Complete notifications có thể chỉ xảy ra duy nhất một lần trong một Observable Execution. Lưu ý rằng, chỉ có 1 trong 2 loại tín hiệu trên được gửi đi, nếu đã complete thì không có error, nếu có error thì không có complete. (Chúng không thuộc về nhau 😄). Và nếu đã gửi đi complete, hoặc error signal, thì sau đó không có dữ liệu nào được gửi đi nữa. Tức là stream đã close.

Ví dụ:

Khi Subscribe vào observable được tạo ở trên, bạn có thể thấy được kết quả như sau:

Lưu ý rằng trong ví dụ trên, tôi đã invoke Observable bằng cách subscribe với 3 callbacks riêng biệt cho 3 loại signals tương ứng, về mặt xử lý phía sâu bên trong sẽ convert về Observer object có dạng.

6.4 Disposing Observables

Bởi vì quá trình thực thi Observable có thể lặp vô hạn, hoặc trong trường hợp nào đó bạn muốn thực hiện hủy việc thực thi vì việc này không còn cần thiết nữa – dữ liệu đã lỗi thời, có dữ liệu khác thay thế. Các bạn có thể liên tưởng tới việc close websocket stream, removeEvenListener cho một element nào đó đã bị loại bỏ khỏi DOM chẳng hạn.

Observable có cơ chế tương ứng, cho phép chúng ta hủy việc thực thi. Đó là khi subscribe được gọi, một Observer sẽ bị gắn với một Observable execution mới được tạo, sau đó nó sẽ trả về một object thuộc type Subscription. Kiểu dữ liệu này có một method unsubscribe khi chúng ta gọi đến, nó sẽ thực hiện cơ chế để hủy việc thực thi.

Lưu ý: nếu bạn tạo Observable bằng create hoặc new thì bạn phải tự thiết lập cơ chế để hủy.

Ví dụ:

7. Observer

Observer là một Consumer những dữ liệu được gửi bởi Observable. Observer là một object chứa một tập 3 callbacks tương ứng cho mỗi loại notification được gửi từ Observable: next, error, complete.

Một Observer có dạng như sau:

Observer được cung cấp là tham số đầu vào của subscribe để kích hoạt Observable execution.

Observe có thể chỉ có một số callbacks trong bộ 3 callbacks kể trên (có thể là một object không có callback nào trong bộ kể trên, trường hợp này ít dùng đến).

Như tôi đã đề cập từ trước, observable.subscribe sẽ chuẩn hóa các callbacks thành Observer object tương ứng, bạn có thể truyền vào các hàm rời rạc nhau, nhưng cần lưu ý truyền đúng thứ tự callback.

Lưu ý: Nếu bạn không muốn truyền error handler function vào, hãy truyền null/undefined:

8. Subscription

Subscription là một object đại diện cho một nguồn tài nguyên có khả năng hủy được, thông thường trong Rxjs là hủy Observable execution. Subscription có chứa một method quan trọng unsubscribe (từ Rxjs 5), khi method này được gọi, execution sẽ bị hủy.

Ví dụ: chúng ta có một đồng hồ đếm thời gian, mỗi giây sẽ gửi đi một giá trị, giả sử sau khi chạy 5s chúng ta cần hủy phần thực thi này.

Một Subscription có thể chứa trong nó nhiều Subscriptions con, khi Subscription unsubscribe, các Subscriptions con cũng sẽ unsubscribe.

Ở Subscription cha, chúng ta có thể gọi method add để thêm các Subscriptions con mà phụ thuộc Subscription cha này.

9. Cold Observables vs Hot Observables

9.1 Cold Observables

Một observable là “cold” nếu Producer được tạo ra và quản lý bên trong nó.

Ví dụ :

Kết quả là sau 1s thì lần subscribe đầu tiên có in ra 15, và lần subscribe thứ 2 in ra 5. Chúng không có cùng giá trị của x.

9.2 Hot Observables

Giờ thay đổi chút bằng việc move khai báo biến x ra ngoài create:

Lần này, sau 1s thì cả 2 execution đều in ra giá trị là 15.

Đây chính là ví dụ về Hot Observables.

Tổng kết

Bài viết của mình đến đây là kết thúc. Hi vọng bài viết của mình có thể giúp bạn hiểu và nắm được về Fucntional Reactive Programing, ObservableRxJS.

Bài viết cũng khó tránh khỏi những sai xót, mong mọi người thông cảm, và rất mong những ý kiến đóng góp của mọi người để bài viết được hoàn thiện hơn.

Cảm ơn các bạn đã dành thời gian cho bài viết của mình !!!

Chia sẻ bài viết ngay

Nguồn bài viết : Viblo