Size constraint operator
Limit-size intermediate operators such as take cancel the execution of flow when the corresponding limit is reached. Cancellation in coroutines is always done by throwing an exception, so that all resource management functions (like try {…} finally {…}) function normally in the event of a cancellation. revoke:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | fun numbers(): Flow<Int> = flow { try { emit(1) emit(2) println("This line will not execute") emit(3) } finally { println("Finally in numbers") } } fun main() = runBlocking<Unit> { numbers() .take(2) // take only the first two .collect { value -> println(value) } } |
The output of this code clearly shows that the execution of the body of flow {…} in the numbers () function stopped after emitting the second number:
1 2 3 4 | 1 2 Finally in numbers |
Terminal flow operators
Terminal operators on threads are suspending functions that will launch a thread collection. The collection operators are one of the most basic, but there are other terminal operators that can make it easier:
- Convert to different collections like toList and toSet .
- Operators to get the first value (first operator) and to ensure that a stream emitted a single value (single).
- Reduce the flow to one value by reduce and fold . For example:
1 2 3 4 5 | val sum = (1..5).asFlow() .map { it * it } // squares of numbers from 1 to 5 .reduce { a, b -> a + b } // sum them (terminal operator) println(sum) |
Print a single number:
55
The streams are sequential
Each individual set of a thread is performed sequentially unless special operators that operate on multiple threads are used. Collections that operate directly in the registration process call a terminal operator. No new processes are launched by default. Each output value is processed by all intermediate operators from upstream to downstream and then passed to the terminal operator afterwards.
See the following example to filter for even integers and map them to a string:
1 2 3 4 5 6 7 8 9 10 11 12 | (1..5).asFlow() .filter { println("Filter $it") it % 2 == 0 } .map { println("Map $it") "string $it" }.collect { println("Collect $it") } |
Output:
1 2 3 4 5 6 7 8 9 10 | Filter 1 Filter 2 Map 2 Collect string 2 Filter 3 Filter 4 Map 4 Collect string 4 Filter 5 |
Context of the flow
Collection of a thread always occurs in the context of the calling process. For example, if there is a simple thread, then the following code will run in the context specified by this code’s author, regardless of the implementation details of the simple thread:
1 2 3 4 5 6 | withContext(context) { simple().collect { value -> println(value) // run in the specified context } } |
This property of a thread is called context conservation.
So, by default, the code in the flow {…} generator runs in the context provided by the collector of the respective stream. For example, consider implementing a threaded simple function where it is called and emits three numbers:
1 2 3 4 5 6 7 8 9 10 11 | fun simple(): Flow<Int> = flow { log("Started simple flow") for (i in 1..3) { emit(i) } } fun main() = runBlocking<Unit> { simple().collect { value -> log("Collected $value") } } |
Running this code generates:
1 2 3 4 5 | [main @coroutine#1] Started simple flow [main @coroutine#1] Collected 1 [main @coroutine#1] Collected 2 [main @coroutine#1] Collected 3 |
Because simple (). Collect is called from the main thread, the simple thread’s body is also called on the main thread. This is the perfect default for fast or asynchronous code regardless of execution context and does not block callers.
Wrong withContext
However, long-running CPU-consuming code may need to be executed in the Dispatchers context. Default code and UI updates may need to be executed in the Dispatchers.Main context. Usually, withContext is used to change the context in code using Kotlin coroutines, but the code in the flow generator {…} must respect the context-conserving property and cannot be emitted from another context.
Try running the following code:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | fun simple(): Flow<Int> = flow { // The WRONG way to change context for CPU-consuming code in flow builder kotlinx.coroutines.withContext(Dispatchers.Default) { for (i in 1..3) { Thread.sleep(100) // pretend we are computing it in CPU-consuming way emit(i) // emit next value } } } fun main() = runBlocking<Unit> { simple().collect { value -> println(value) } } |
This code generates the following exception:
1 2 3 4 5 6 | Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated: Flow was collected in [CoroutineId(1), "coroutine#1":BlockingCoroutine{Active}@5511c7f8, <a class="__cf_email__" href="/cdn-cgi/l/email-protection">[email protected]</a> ], but emission happened in [CoroutineId(1), "coroutine#1":DispatchedCoroutine{Active}@2dae0000, Dispatchers.Default]. Please refer to 'flow' documentation or use 'flowOn' instead at ... |
The flowOn operator
An exception refers to the flowOn function that will be used to change the context in which the stream is generated. The correct way to change the context of the stream is shown in the example below, which also prints the names of the corresponding streams to show how it all works:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | fun simple(): Flow<Int> = flow { for (i in 1..3) { Thread.sleep(100) // pretend we are computing it in CPU-consuming way log("Emitting $i") emit(i) // emit next value } }.flowOn(Dispatchers.Default) // RIGHT way to change context for CPU-consuming code in flow builder fun main() = runBlocking<Unit> { simple().collect { value -> log("Collected $value") } } |
Notice how the flow {…} works in the background thread, while the collection takes place in the main thread:
Another thing to observe here is that the flowOn operator has changed the default serialization nature of the stream. Now the collection happens in one coroutine (“coroutine # 1”) and the emission happens in another coroutine (“coroutine # 2”) running in another thread at the same time as the collecting thread. The flowOn operator creates another coroutine for an upstream flow when it has to change the CoroutineDispatcher in its context.
Source https://kotlinlang.org/docs/reference/coroutines/flow.html#size-limiting-operators (continued)