1. Introducing Flow in Kotlin Coroutine
Flow
basically similar to Sequences
in Kotlin but different in that Sequences
handle synchronously while Flow
processes asynchronously. If you do not know about Sequences
, this concept makes you quite difficult to understand right . So first I will talk a little bit about Collections
and Sequences
in Kotlin.
Collections vs Sequences vs Flow
I will use Collections
vs Sequences
vs Flow
to give a solution for a problem: Build foo()
function prints out 3 numbers 1, 2, 3
with the delay time and measures the execution time of the foo
function. Thereby you will easily see the difference between Collections
vs Sequences
vs Flow
.
Starting with Collections
, the representation in this example is List
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | suspend fun foo(): List<Int> { val list = mutableListOf<Int>() for (i in 1..3) { delay(1000) list.add(i) } return list } fun main() = runBlocking { val time = measureTimeMillis { foo().forEach { value -> println(value) } } println(time) } |
Output (gif image):
These are when using Sequences
1 2 3 4 5 6 7 8 9 10 11 12 13 14 | fun foo(): Sequence<Int> = sequence { // sequence builder for (i in 1..3) { Thread.sleep(1000) yield(i) } } fun main() = runBlocking { val time = measureTimeMillis { foo().forEach { value -> println(value) } } println(time) } |
Output (gif image):
Have you seen the difference yet . 2 Output printed are the same and the time taken is equal, is 3 seconds, but differs in boys List
it forward to add
finished all 3 elements then printed out, also in the example Sequence
, then every second then the element is yield
and the element is immediately printed without waiting for the yield
all 3 elements.
And this is Flow
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | fun foo(): Flow<Int> = flow { // flow builder for (i in 1..3) { delay(1000) emit(i) // emit next value } } fun main() = runBlocking { // Collect the flow val a = measureTimeMillis { foo().collect { value -> println(value) } } println(a) } |
Output (gif image):
Basically, Flow
quite similar to Sequence
right, instead of using yield
function, Flow
uses emit
function and receives values via collect
function. You do not need to understand the code above about Flow
because I will explain at the bottom in this article.
At the beginning of the article, I said: ” Flow
basically quite similar to Sequences
in Kotlin but differs in that Sequences
handle synchronously and Flow
processes asynchronously”. Now we will go to clarify this difference offline.
Flow vs Sequences
Sequence
block main thread:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 | fun foo(): Sequence<Int> = sequence { // sequence builder for (i in 1..3) { Thread.sleep(1000) yield(i) // yield next value } } fun main() = runBlocking { // Launch a concurrent coroutine to check if the main thread is blocked launch { println(Thread.currentThread().name) for (k in 1..3) { delay(1000) println("I'm blocked $k") } } val time = measureTimeMillis { foo().forEach { value -> println(value) } } println("$time s") } |
Output (gif image):
I have launch
a coroutine on the main thread to check if the main thread is blocked. I have used Thread.currentThread().name
to print the main
word to make sure coroutine runs on the main thread. Have you noticed that coroutine runs on the main thread but it does not block the main thread, this is the characteristic of coroutine that I introduced in part 2 . So coroutine and foo
will run in parallel. And the result shows that the foo
function containing Sequence
has blocked the main thread, so the three I'm blocked
lines have to wait for Sequence
print all three values first before it is in turn to be printed.
So, when using Flow:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 | fun foo(): Flow<Int> = flow { // flow builder for (i in 1..3) { delay(1000) emit(i) // emit next value } } fun main() = runBlocking { // Launch a concurrent coroutine to check if the main thread is blocked launch { println(Thread.currentThread().name) for (k in 1..3) { delay(900) println("I'm not blocked $k") } } // Collect the flow val time = measureTimeMillis { foo().collect { value -> println(value) } } println("$time s") } |
Output (gif image):
Similar to the code example Sequence
, I also launch a coroutine on the main thread to check if the main thread is blocked. And the results show that Flow
does not block the main thread, as evidenced by the numbers 1, 2, 3
printed in parallel with I'm not blocked
.
In summary: Sequence
synchronous processing. It uses Iterator
and block main thead while waiting for the next item to yield
. Flow
handles asynchronously. It uses a collect
suspend function to not block the main thread while waiting for the next item to be emit
.
Flow
Now, I will explain the lines of code that I have used to example Flow
:
flow { }
blockflow { }
is a builder function that helps us create aFlow
object.- The code inside
flow { ... }
can be suspended, which means we can call suspend functions in theflow { }
blockflow { }
. So the functionfoo()
calls theflow { }
blockflow { }
no longer need to be a suspend function. - The
emit
function is used to emit values fromFlow
. This function is suspend function - The
collect
function is used to get the emit value from theemit
function. This function is also a suspend function.
2. Flow is a cold data source
Flow
streams are cold streams similar to Sequences
. That means the code inside flow { }
will not run until Flow
calls the collect
function.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 | fun foo(): Flow<Int> = flow { println("Flow started") for (i in 1..3) { delay(100) emit(i) } } fun main() = runBlocking<Unit> { println("Calling foo...") val flow = foo() println("Calling collect...") flow.collect { value -> println(value) } println("Calling collect again...") flow.collect { value -> println(value) } } |
Output:
1 2 3 4 5 6 7 8 9 10 11 12 | Calling foo... Calling collect... Flow started 1 2 3 Calling collect again... Flow started 1 2 3 |
We can see that despite calling foo()
, the code in Flow
still doesn’t run. Until Flow
calls the collect
function, the code in Flow
will run and that code will run again when we call the collect
function again.
3. Flow cancellation
Flow
complies with coroutines’ general cancellation principles (see section 4 ). The collect
of the flow
can only be canceled when and only when the flow
is suspended (such as a delay
function) and vice versa the flow
cannot be canceled.
The code below will show you the flow is canceled when timeout is over. We use the withTimeoutOrNull function
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 | fun foo(): Flow<Int> = flow { for (i in 1..3) { delay(2000) println("Emitting $i") emit(i) } } fun main() = runBlocking { withTimeoutOrNull(5000) { // Timeout after 5s foo().collect { value -> println(value) } } println("Done") } |
Output:
1 2 3 4 5 6 | Emitting 1 1 Emitting 2 2 Done |
In the first 4 seconds, số 1
and số 2
are printed. By the 5 second, the timeout flow has expired and the flow is being suspended because the delay(2000)
function delay(2000)
(1 second is left until the 6th second, the flow will stop suspending) so the flow is canceled and the số 3
not printed.
Now I will replace the delay
function with Thread.sleep
function to check whether flow cannot be canceled when it does not suspend?
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 | fun foo(): Flow<Int> = flow { for (i in 1..3) { Thread.sleep(2000) println("Emitting $i") emit(i) } } fun main() = runBlocking { withTimeout(1000) { // Timeout after 1s foo().collect { value -> println(value) } } println("Done") } |
Output:
1 2 3 4 5 6 7 8 | Emitting 1 1 Emitting 2 2 Emitting 3 3 Done |
As you can see, the flow still prints all 3 numbers 1, 2, 3
even though the timeout timeout is 1 second. So, the flow cannot be canceled while running or otherwise it is not in suspend state.
4. Ways to create Flow
In addition to using the flow { }
block flow { }
as the code above I used to create a Flow
, there are other ways to create Flow
object such as:
FlowOf function
1 2 | public fun <T> flowOf(vararg elements: T): Flow<T> |
Example code:
1 2 3 4 5 | fun main() = runBlocking { val data = flowOf(1,"abc", 3.4, "def") data.collect { println(it) } } |
Output:
1 2 3 4 5 | 1 abc 3.4 def |
.asFlow () extension function
Collections
, Arrays
, Sequences
or any type of T
can be converted to Flow
via the asFlow()
extension function. The following image lists the full asFlow()
extension asFlow()
Example code:
1 2 3 4 | fun main() = runBlocking { listOf(1, "abc", 3.4, "def").asFlow().collect { println(it) } } |
Output:
1 2 3 4 5 | 1 abc 3.4 def |
Conclude
Flow
is really a very powerful thing in Kotlin Coroutine. Hopefully through this article, you have some understanding of Flow
. In the next section, I will introduce its true power – that is the operators (operators). Flow
has a lot of operators not inferior to Rx
. Thank you for following this article. Hope you will keep watching the next sections.
Reference source:
https://kotlinlang.org/docs/reference/coroutines/flow.html
Read the previous sections:
Learn Kotlin Coroutine, part 1: Introduce Kotlin Coroutine and asynchronous programming techniques
Learn Kotlin Coroutine, part 2: Build first coroutine with Kotlin
Learn Kotlin Coroutine, part 3: Coroutine Context and Dispatcher
Study Kotlin Coroutine, part 4: Job, Join, Cancellation and Timeouts
Learn Kotlin Coroutine, part 5: Async & Await