Asynchronous Flow in Coroutines (Part II)

Tram Ho

Size constraint operator

Limit-size intermediate operators such as take cancel the execution of flow when the corresponding limit is reached. Cancellation in coroutines is always done by throwing an exception, so that all resource management functions (like try {…} finally {…}) function normally in the event of a cancellation. revoke:

The output of this code clearly shows that the execution of the body of flow {…} in the numbers () function stopped after emitting the second number:

Terminal flow operators

Terminal operators on threads are suspending functions that will launch a thread collection. The collection operators are one of the most basic, but there are other terminal operators that can make it easier:

  • Convert to different collections like toList and toSet .
  • Operators to get the first value (first operator) and to ensure that a stream emitted a single value (single).
  • Reduce the flow to one value by reduce and fold . For example:

Print a single number:

55

The streams are sequential

Each individual set of a thread is performed sequentially unless special operators that operate on multiple threads are used. Collections that operate directly in the registration process call a terminal operator. No new processes are launched by default. Each output value is processed by all intermediate operators from upstream to downstream and then passed to the terminal operator afterwards.

See the following example to filter for even integers and map them to a string:

Output:

Context of the flow

Collection of a thread always occurs in the context of the calling process. For example, if there is a simple thread, then the following code will run in the context specified by this code’s author, regardless of the implementation details of the simple thread:

This property of a thread is called context conservation.

So, by default, the code in the flow {…} generator runs in the context provided by the collector of the respective stream. For example, consider implementing a threaded simple function where it is called and emits three numbers:

Running this code generates:

Because simple (). Collect is called from the main thread, the simple thread’s body is also called on the main thread. This is the perfect default for fast or asynchronous code regardless of execution context and does not block callers.

Wrong withContext

However, long-running CPU-consuming code may need to be executed in the Dispatchers context. Default code and UI updates may need to be executed in the Dispatchers.Main context. Usually, withContext is used to change the context in code using Kotlin coroutines, but the code in the flow generator {…} must respect the context-conserving property and cannot be emitted from another context.

Try running the following code:

This code generates the following exception:

The flowOn operator

An exception refers to the flowOn function that will be used to change the context in which the stream is generated. The correct way to change the context of the stream is shown in the example below, which also prints the names of the corresponding streams to show how it all works:

Notice how the flow {…} works in the background thread, while the collection takes place in the main thread:

Another thing to observe here is that the flowOn operator has changed the default serialization nature of the stream. Now the collection happens in one coroutine (“coroutine # 1”) and the emission happens in another coroutine (“coroutine # 2”) running in another thread at the same time as the collecting thread. The flowOn operator creates another coroutine for an upstream flow when it has to change the CoroutineDispatcher in its context.

Source https://kotlinlang.org/docs/reference/coroutines/flow.html#size-limiting-operators (continued)

Share the news now

Source : Viblo