Introduction
I wrote this series with the aim of sharing knowledge about Bloc Pattern in Flutter. Since it is related to the knowledge of asynchronous programming like Stream, I decided to go from root to top.
This series will include 3 main contents:
Part 1 : Stream, the concepts related to Stream such as Stream Controller, Subscription, … Because understanding Stream, you can understand RxDart and Bloc Pattern. If you already understand the concept of Stream please go straight to the sequel to this series.
Part 2 : RxDart (need knowledge about Stream in part 1)
Part 3 : Bloc Pattern (need knowledge of Stream in part 1)
Okay, now we’re going to start with part 1 of the Tream Touch.
1. What is Stream, What is Event
You can imagine this carousel as a Stream
.
Definition verbatim from doc:
A stream is a sequence of asynchronous events
Literally translated, Stream is a series of asynchronous events
. The blocks that are skating on the ice are events
. Event
can be a data
, can also be an error
, or a done
state (In the Create Stream below I will say more clearly). These events
are processed asynchronously. Thus a Stream
is a series of events
are processed asynchronously.
In fact, it’s not streamed forever like the gif. It can be created, can also be closed and we can also pause the conveyor (pause) and resume the belt to run (resume).
During data transmission, it is possible to go through some intermediate processing stages before reaching the consumer. As shown in the image below, the input data is chữ thường
through a processing function called map
to output the output data in capital chữ in hoa
. Such functions are, however, limited in the Stream
class of Dart. That’s why we need RxDart
– a library that adds extra power to Stream
.
If you have ever known RxJava or RxJs, … then the definition of Stream
is the definition of Observable
in Rx.
2. Create Stream, listen to Stream and explain terminology
What is emit
The act of broadcasting Event
of the Stream is called emit
Create Stream
There are many ways to create a stream. Here I temporarily introduce the easiest way to explain the term.
1 2 | <span class="token keyword">var</span> stream <span class="token operator">=</span> Stream <span class="token punctuation">.</span> <span class="token function">value</span> <span class="token punctuation">(</span> <span class="token string">'gấu bông'</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token comment">// tạo ra 1 stream và đồng thời emit 1 data là 'gấu bông'</span> |
Here I have created a Stream and emit 1 data is ‘teddy bear’ but when running the program, nothing happens. It’s because I gave out a ‘teddy bear’ but no one accepted it. To receive a teddy bear, you must subscribe to Stream to receive it when the ‘teddy bear’ is released. Next we will proceed to subscribe to Stream on (subscribe a Stream).
Subscribe a Stream
We use the listen
to subscribe to events from the stream.
1 2 3 4 5 | <span class="token keyword">var</span> stream <span class="token operator">=</span> Stream <span class="token punctuation">.</span> <span class="token function">value</span> <span class="token punctuation">(</span> <span class="token string">'gấu bông'</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token comment">// tạo ra 1 stream và đồng thời emit 1 data là 'gấu bông'</span> stream <span class="token punctuation">.</span> <span class="token function">listen</span> <span class="token punctuation">(</span> <span class="token punctuation">(</span> event <span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token comment">// đăng ký nhận event từ stream</span> <span class="token function">print</span> <span class="token punctuation">(</span> <span class="token string">'Tôi đã nhận được $event'</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token punctuation">}</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> |
Output:
1 2 | Tôi đã nhận được gấu bông |
In RxJava, …, this lambda is also called (event) { print('Tôi đã nhận được $event'); }
is 1 Observer
In the listen
we have additional optional parameters such as onDone
, onError
A Stream when emit finishes all events, it will emit the event on onDone
, if an error is encountered it will emit an error and onError
will help us get that event.
1 2 3 4 5 6 | <span class="token keyword">var</span> stream <span class="token operator">=</span> Stream <span class="token punctuation">.</span> <span class="token function">value</span> <span class="token punctuation">(</span> <span class="token string">'gấu bông'</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token comment">// tạo ra 1 stream và đồng thời emit 1 data là 'gấu bông' và done luôn</span> stream <span class="token punctuation">.</span> <span class="token function">listen</span> <span class="token punctuation">(</span> <span class="token punctuation">(</span> event <span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token function">print</span> <span class="token punctuation">(</span> <span class="token string">'Tôi đã nhận được $event'</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token punctuation">}</span> <span class="token punctuation">,</span> onDone <span class="token punctuation">:</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token operator">></span> <span class="token function">print</span> <span class="token punctuation">(</span> <span class="token string">'Done rồi'</span> <span class="token punctuation">)</span> <span class="token punctuation">,</span> onError <span class="token punctuation">:</span> <span class="token punctuation">(</span> error <span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token operator">></span> <span class="token function">print</span> <span class="token punctuation">(</span> <span class="token string">'Lỗi rồi $error'</span> <span class="token punctuation">)</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> |
Output: Since there was no error, the output will look like this.
1 2 3 | Tôi đã nhận được gấu bông Done rồi |
More ways to create Stream
Now I will introduce more ways to create Stream more:
- Create a Stream at the same time emit 1 error and done always.
1 2 | Stream <span class="token punctuation">.</span> <span class="token function">error</span> <span class="token punctuation">(</span> <span class="token function">FormatException</span> <span class="token punctuation">(</span> <span class="token string">'lỗi nè'</span> <span class="token punctuation">)</span> <span class="token punctuation">)</span> <span class="token punctuation">.</span> <span class="token function">listen</span> <span class="token punctuation">(</span> print <span class="token punctuation">,</span> onError <span class="token punctuation">:</span> print <span class="token punctuation">,</span> onDone <span class="token punctuation">:</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token operator">></span> <span class="token function">print</span> <span class="token punctuation">(</span> <span class="token string">'Done!'</span> <span class="token punctuation">)</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> |
Output:
1 2 3 | FormatException: lỗi nè Done! |
- Create a Stream and emit 1 List data finished then Done.
1 2 | Stream <span class="token punctuation">.</span> <span class="token function">fromIterable</span> <span class="token punctuation">(</span> <span class="token punctuation">[</span> <span class="token number">1</span> <span class="token punctuation">,</span> <span class="token number">2</span> <span class="token punctuation">,</span> <span class="token number">3</span> <span class="token punctuation">]</span> <span class="token punctuation">)</span> <span class="token punctuation">.</span> <span class="token function">listen</span> <span class="token punctuation">(</span> print <span class="token punctuation">,</span> onError <span class="token punctuation">:</span> print <span class="token punctuation">,</span> onDone <span class="token punctuation">:</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token operator">></span> <span class="token function">print</span> <span class="token punctuation">(</span> <span class="token string">'Done!'</span> <span class="token punctuation">)</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> |
Output:
1 2 3 4 5 | 1 2 3 Done! |
- Create a Stream from 1
Future
.
1 2 3 4 5 6 7 8 | <span class="token keyword">void</span> <span class="token function">main</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token punctuation">{</span> Stream <span class="token punctuation">.</span> <span class="token function">fromFuture</span> <span class="token punctuation">(</span> <span class="token function">testFuture</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token punctuation">)</span> <span class="token punctuation">.</span> <span class="token function">listen</span> <span class="token punctuation">(</span> print <span class="token punctuation">,</span> onError <span class="token punctuation">:</span> print <span class="token punctuation">,</span> onDone <span class="token punctuation">:</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token operator">></span> <span class="token function">print</span> <span class="token punctuation">(</span> <span class="token string">'Done!'</span> <span class="token punctuation">)</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token punctuation">}</span> Future <span class="token operator"><</span> int <span class="token operator">></span> <span class="token function">testFuture</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token keyword">async</span> <span class="token punctuation">{</span> <span class="token keyword">return</span> <span class="token number">3</span> <span class="token punctuation">;</span> <span class="token punctuation">}</span> |
Output:
1 2 3 | 3 Done! |
- Creating a stream every 1 second will emit an event. You note that the
error
is also anevent
so when the Stream emit generates an error, the Stream does not stop.
1 2 3 4 5 6 7 8 9 10 11 | Stream <span class="token punctuation">.</span> <span class="token function">periodic</span> <span class="token punctuation">(</span> <span class="token function">Duration</span> <span class="token punctuation">(</span> seconds <span class="token punctuation">:</span> <span class="token number">1</span> <span class="token punctuation">)</span> <span class="token punctuation">,</span> <span class="token punctuation">(</span> int i <span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token keyword">if</span> <span class="token punctuation">(</span> i <span class="token operator">==</span> <span class="token number">5</span> <span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token keyword">throw</span> <span class="token function">Exception</span> <span class="token punctuation">(</span> <span class="token string">'lỗi'</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token punctuation">}</span> <span class="token keyword">if</span> <span class="token punctuation">(</span> i <span class="token operator">%</span> <span class="token number">2</span> <span class="token operator">==</span> <span class="token number">0</span> <span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token keyword">return</span> <span class="token string">'$i là số chẵn'</span> <span class="token punctuation">;</span> <span class="token punctuation">}</span> <span class="token keyword">else</span> <span class="token punctuation">{</span> <span class="token keyword">return</span> <span class="token string">'$i là số lẻ'</span> <span class="token punctuation">;</span> <span class="token punctuation">}</span> <span class="token punctuation">}</span> <span class="token punctuation">)</span> <span class="token punctuation">.</span> <span class="token function">listen</span> <span class="token punctuation">(</span> print <span class="token punctuation">,</span> onError <span class="token punctuation">:</span> print <span class="token punctuation">,</span> onDone <span class="token punctuation">:</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token operator">></span> <span class="token function">print</span> <span class="token punctuation">(</span> <span class="token string">'Done!'</span> <span class="token punctuation">)</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> |
Output:
1 2 3 4 5 6 7 8 9 10 11 | 0 là số chẵn 1 là số lẻ 2 là số chẵn 3 là số lẻ 4 là số chẵn Exception: lỗi 6 là số chẵn 7 là số lẻ 8 là số chẵn ....... |
This stream is immortal. Now I will use StreamSubscription to slay it, make it invulnerable!
3. What is StreamSubscription
The listen
above will return a StreamSubscription object. StreamSubscription helps us to control the stream. We can pause stream, resume stream, and cancel the stream.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 | <span class="token keyword">void</span> <span class="token function">main</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token keyword">async</span> <span class="token punctuation">{</span> <span class="token keyword">var</span> subscription <span class="token operator">=</span> Stream <span class="token punctuation">.</span> <span class="token function">periodic</span> <span class="token punctuation">(</span> <span class="token function">Duration</span> <span class="token punctuation">(</span> seconds <span class="token punctuation">:</span> <span class="token number">1</span> <span class="token punctuation">)</span> <span class="token punctuation">,</span> <span class="token punctuation">(</span> int i <span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token keyword">if</span> <span class="token punctuation">(</span> i <span class="token operator">==</span> <span class="token number">5</span> <span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token keyword">throw</span> <span class="token function">Exception</span> <span class="token punctuation">(</span> <span class="token string">'lỗi'</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token punctuation">}</span> <span class="token keyword">if</span> <span class="token punctuation">(</span> i <span class="token operator">%</span> <span class="token number">2</span> <span class="token operator">==</span> <span class="token number">0</span> <span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token keyword">return</span> <span class="token string">'$i là số chẵn'</span> <span class="token punctuation">;</span> <span class="token punctuation">}</span> <span class="token keyword">else</span> <span class="token punctuation">{</span> <span class="token keyword">return</span> <span class="token string">'$i là số lẻ'</span> <span class="token punctuation">;</span> <span class="token punctuation">}</span> <span class="token punctuation">}</span> <span class="token punctuation">)</span> <span class="token punctuation">.</span> <span class="token function">listen</span> <span class="token punctuation">(</span> println <span class="token punctuation">,</span> onError <span class="token punctuation">:</span> println <span class="token punctuation">,</span> onDone <span class="token punctuation">:</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token operator">></span> <span class="token function">println</span> <span class="token punctuation">(</span> <span class="token string">'Done!'</span> <span class="token punctuation">)</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token comment">// Sau 3 giây kể từ lúc run chương trình ta sẽ pause stream</span> <span class="token keyword">await</span> Future <span class="token punctuation">.</span> <span class="token function">delayed</span> <span class="token punctuation">(</span> <span class="token function">Duration</span> <span class="token punctuation">(</span> seconds <span class="token punctuation">:</span> <span class="token number">3</span> <span class="token punctuation">)</span> <span class="token punctuation">,</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token function">println</span> <span class="token punctuation">(</span> <span class="token string">'pause stream'</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> subscription <span class="token punctuation">.</span> <span class="token function">pause</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token punctuation">}</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token comment">// Sau 2 giây kể từ lúc pause stream ta sẽ resume stream</span> <span class="token keyword">await</span> Future <span class="token punctuation">.</span> <span class="token function">delayed</span> <span class="token punctuation">(</span> <span class="token function">Duration</span> <span class="token punctuation">(</span> seconds <span class="token punctuation">:</span> <span class="token number">2</span> <span class="token punctuation">)</span> <span class="token punctuation">,</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token function">println</span> <span class="token punctuation">(</span> <span class="token string">'resume stream'</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> subscription <span class="token punctuation">.</span> <span class="token function">resume</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token punctuation">}</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token comment">// Sau 3 giây kể từ lúc resume stream ta sẽ cancel stream</span> <span class="token keyword">await</span> Future <span class="token punctuation">.</span> <span class="token function">delayed</span> <span class="token punctuation">(</span> <span class="token function">Duration</span> <span class="token punctuation">(</span> seconds <span class="token punctuation">:</span> <span class="token number">3</span> <span class="token punctuation">)</span> <span class="token punctuation">,</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token function">println</span> <span class="token punctuation">(</span> <span class="token string">'cancel stream'</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> subscription <span class="token punctuation">.</span> <span class="token function">cancel</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token punctuation">}</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token punctuation">}</span> <span class="token comment">// mình sử dụng hàm println này để in ra thời gian hiện tại cho dễ quan sát output</span> <span class="token keyword">void</span> <span class="token function">println</span> <span class="token punctuation">(</span> Object value <span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token function">print</span> <span class="token punctuation">(</span> <span class="token string">'${DateTime.now()}: $value'</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token punctuation">}</span> |
Output:
1 2 3 4 5 6 7 8 9 10 11 | 2020-08-29 21:53:32.882979: 0 là số chẵn 2020-08-29 21:53:33.881757: 1 là số lẻ 2020-08-29 21:53:34.880876: 2 là số chẵn 2020-08-29 21:53:34.884870: pause stream 2020-08-29 21:53:36.888422: resume stream 2020-08-29 21:53:37.890725: 3 là số lẻ 2020-08-29 21:53:38.891863: 4 là số chẵn 2020-08-29 21:53:39.891114: cancel stream Process finished with exit code 0 |
And we were able to stop the immortal stream above successfully by cancel()
function. People often call this action unsubscribe một Stream
.
4. Stream Builder, async *, yield and yield *
In fact, people rarely use the above methods to create streams but they use the so-called stream builder
, where they can emit data any time they want.
To create a stream builder we create a function with async*
and return a Stream
. To emit a data we can use yield
, to emit all data in a stream we can use yield*
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 | <span class="token keyword">void</span> <span class="token function">main</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token function">testStream</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token punctuation">.</span> <span class="token function">listen</span> <span class="token punctuation">(</span> println <span class="token punctuation">,</span> onError <span class="token punctuation">:</span> println <span class="token punctuation">,</span> onDone <span class="token punctuation">:</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token operator">=</span> <span class="token operator">></span> <span class="token function">println</span> <span class="token punctuation">(</span> <span class="token string">'Done!'</span> <span class="token punctuation">)</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token punctuation">}</span> Stream <span class="token operator"><</span> int <span class="token operator">></span> <span class="token function">testStream</span> <span class="token punctuation">(</span> <span class="token punctuation">)</span> <span class="token keyword">async*</span> <span class="token punctuation">{</span> <span class="token keyword">yield</span> <span class="token number">10</span> <span class="token punctuation">;</span> <span class="token comment">// emit 10</span> <span class="token keyword">await</span> Future <span class="token punctuation">.</span> <span class="token function">delayed</span> <span class="token punctuation">(</span> <span class="token function">Duration</span> <span class="token punctuation">(</span> seconds <span class="token punctuation">:</span> <span class="token number">2</span> <span class="token punctuation">)</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token comment">// delay 2s</span> <span class="token keyword">yield*</span> Stream <span class="token punctuation">.</span> <span class="token function">fromIterable</span> <span class="token punctuation">(</span> <span class="token punctuation">[</span> <span class="token number">1</span> <span class="token punctuation">,</span> <span class="token number">2</span> <span class="token punctuation">,</span> <span class="token number">3</span> <span class="token punctuation">]</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token comment">// emit nguyên 1 stream</span> <span class="token keyword">await</span> Future <span class="token punctuation">.</span> <span class="token function">delayed</span> <span class="token punctuation">(</span> <span class="token function">Duration</span> <span class="token punctuation">(</span> seconds <span class="token punctuation">:</span> <span class="token number">3</span> <span class="token punctuation">)</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token comment">// delay 3s</span> <span class="token keyword">throw</span> <span class="token function">FormatException</span> <span class="token punctuation">(</span> <span class="token string">'lỗi'</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token keyword">yield</span> <span class="token number">13</span> <span class="token punctuation">;</span> <span class="token comment">// hàm này đã xảy ra Exception nên số 13 không được phát ra</span> <span class="token punctuation">}</span> <span class="token comment">// mình sử dụng hàm println này để in ra thời gian hiện tại cho dễ quan sát output</span> <span class="token keyword">void</span> <span class="token function">println</span> <span class="token punctuation">(</span> Object value <span class="token punctuation">)</span> <span class="token punctuation">{</span> <span class="token function">print</span> <span class="token punctuation">(</span> <span class="token string">'${DateTime.now()}: $value'</span> <span class="token punctuation">)</span> <span class="token punctuation">;</span> <span class="token punctuation">}</span> |
Output:
1 2 3 4 5 6 7 | 2020-08-30 09:23:23.965913: 10 2020-08-30 09:23:25.975207: 1 2020-08-30 09:23:25.975207: 2 2020-08-30 09:23:25.975207: 3 2020-08-30 09:23:28.977575: FormatException: lỗi 2020-08-30 09:23:28.978575: Done! |
Conclude
Through part 1 of this series, hopefully you have grasped the important terms in Stream as well as in Dart’s asynchronous programming. These terms are crucial for the next series in this series. In the next article, I will introduce you to the Stream and Stream Controller, Broadcast Stream operators. Hope you guys read.
Reference source: https://dart.dev/tutorials/language/streams