Kotlin Coroutine Flow Tutorial

Photo by Hans-Jurgen Mager on Unsplash
Photo by Hans-Jurgen Mager on Unsplash
Flow is one of the features of Kotlin Coroutines. It allows a suspending function to return multiple values.

Flow is one of the features of Kotlin Coroutines. It allows a suspending function to return multiple values. In addition, its design is inspired by ReactiveX, so its usage and function names are very similar to ReactiveX. Not only can flow be used to replace ReactiveX kit, it also works well with other Kotlin Coroutine functions.

Flow

Flow is one of the features of Kotlin coroutine. In order to fully understand flow, it is better that you understand coroutine first. If you are not familiar with coroutine, you can read the following article first.

Creating Your First Flow

First, let’s run our first flow program.

fun getSequence(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking {
    getSequence()
        .collect { value ->
            println(value)
        }
}

The above code will output the following.

1
2
3

flow() is a flow builder and will create a Flow<T>. Moreover, a flow is a reactive stream, as we mentioned at the beginning, flow is inspired by ReactiveX.

emit() will send a result to the flow. We know that flow can return multiple values, so you can call emit() multiple times to send multiple values. As in the example, it calls emit() three times, sending 1, 2, and 3 respectively.

collect() receives the value sent by emit(). Every time emit() is called to send a value, the block of collect() will be executed once to receive the value. Therefore, in the example, emit() is called three times, so the block of collect() will also be executed three times.

Flow Builders

Flow provides some flow builders. We will introduce three commonly used flow builders.

flow()

fun <T> flow(
    block: suspend FlowCollector<T>.() -> Unit
): Flow<T>

We introduced flow() at the beginning. It creates a cold flow. Cold flow means every time you call collect(), it will execute the block of flow(). Then, we use emit() in the block to send the value to upstream.

The following code shows that the block of flow() is executed after calling collect().

fun getSequence(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        println("Emit $i")
        emit(i)
    }
}

fun main() = runBlocking {
    val f = getSequence()
    println("Start to collect")
    f.collect { value ->
        println("Collected $value")
    }
}

Its output is below.

Start to collect
Emit 1
Collected 1
Emit 2
Collected 2
Emit 3
Collected 3

asFlow()

@FlowPreview fun <T> BroadcastChannel<T>.asFlow(): Flow<T>

asFlow() will create a hot flow. Hot flow means that the data is ready to be in the stream when the flow is created; while cold flow is when collect() is called, the flow() block will be executed, and emit() is called in the block to transfer the data to the stream. Therefore, hot flow is like a hot data source.

The following code shows how to create a flow with asFlow().

fun main() = runBlocking {
    val f = (1..3).asFlow()
    println("1. Start to collect")
    f.collect { value ->
        println("Collected $value")
    }
    println("2. Start to collect")
    f.collect { value ->
        println("Collected $value")
    }
}

Its output is below.

1. Start to collect
Collected 1
Collected 2
Collected 3
2. Start to collect
Collected 1
Collected 2
Collected 3

flowOf()

fun <T> flowOf(vararg elements: T): Flow<T>

flowOf() creates a flow, and put all the parameters ​​into the stream. The example is as follows.

fun main() = runBlocking {
    val f = flowOf(1, 2, 3)
    println("1. Start to collect")
    f.collect { value ->
        println("Collected $value")
    }
    println("2. Start to collect")
    f.collect { value ->
        println("Collected $value")
    }
}

Its output is below.

1. Start to collect
Collected 1
Collected 2
Collected 3
2. Start to collect
Collected 1
Collected 2
Collected 3

Operators

Flow is sequential. After a flow is created, and before calling collect(), we can call various flow operators to process or convert values ​​in the flow, and the execution order is sequential. Flow provides many different operators. We will introduce some commonly used operators.

Intermediate Flow Operators

map()

inline fun <T, R> Flow<T>.map(
    crossinline transform: suspend (value: T) -> R
): Flow<R>

map() uses transform to convert each value in the flow, and return a new value to the new flow. Finally, the new flow is returned. The example is as follows.

fun main() = runBlocking {
    (1..3).asFlow()
        .map { "Hello $it" }
        .collect { println(it) }
}

Its output is below.

Hello 1
Hello 2
Hello 3

filter() or filterNot()

inline fun <T> Flow<T>.filter(
    crossinline predicate: suspend (T) -> Boolean
): Flow<T>

filter() filters values in flow. Only when predicate returns true, it puts the value into the new flow.

The following example shows how to use filter() to filter out odd numbers.

fun main() = runBlocking {
    (1..10).asFlow()
        .filter { it % 2 == 0 }
        .collect { println(it) }
}

The output is as follows.

2
4
6
8
10

transform()

inline fun <T, R> Flow<T>.transform(
    crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>

transform() is the basic operator. You can use it to customize an operator.  Within transform, you need to use emit() send a new value into the new flow, and you can call emit() more than once.

In the following code, we use transform() to reimplement to filter out odd numbers and repeat even numbers.

fun main() = runBlocking {
    (1..10).asFlow()
        .transform {
            if (it % 2 == 0) {
                emit(it)
                emit(it)
            }
        }
        .collect { println(it) }
}

Its output is below.

2
2
4
4
6
6
8
8
10
10

Terminal Flow Operators

collect()

suspend fun Flow<*>.collect(): Unit

inline suspend fun <T> Flow<T>.collect(
    crossinline action: suspend (value: T) -> Unit
): Unit

We have seen collect() many times in the above examples. collect() terminates a flow and receives the values from the flow.

reduce()

suspend fun <S, T : S> Flow<T>.reduce(
    operation: suspend (accumulator: S, value: T) -> S
): S

reduce() terminates a flow. Its operation has two parameters where accumulator starts with the first value in the flow, and value starts with the second value in the flow. After that, accumulator will be the return value of last operation, and value will be the next value in the flow.

The following shows how to use reduce() to calculate the sum of 1 to 5.

fun main() = runBlocking {
    val sum = (1..5).asFlow()
        .reduce { accumulator, value ->
            println("accumulator=$accumulator, value=$value")
            accumulator + value
        }
    println("Sum of 1..3 is $sum")
}

Its output is below.

accumulator=1, value=2
accumulator=3, value=3
accumulator=6, value=4
accumulator=10, value=5
Sum of 1..5 is 15

fold()

inline suspend fun <T, R> Flow<T>.fold(
    initial: R,
    crossinline operation: suspend (accumulator: R, value: T) -> R
): R

fold() is almost the same as reduce(), except that the initial accumulator needs to be specified in initial parameter, so accumulator starts with initial.

The following uses fold() to reimplement the sum of 1 to 5, and the initial value is set to 100.

fun main() = runBlocking {
    val result = (1..5).asFlow()
        .fold(100) { accumulator, value ->
            println("accumulator=$accumulator, value=$value")
            accumulator + value
        }
    println("Result is $result")
}

The output is as follows.

accumulator=100, value=1
accumulator=101, value=2
accumulator=103, value=3
accumulator=106, value=4
accumulator=110, value=5
Result is 115

single()

suspend fun <T> Flow<T>.single(): T

single() is similar to collect(), but the difference is that it only receives one value. If there are multiple values ​​in the flow, it will throw an IllegalStateException. A typical example is that after we issue a Rest API request, we can use single() to wait for the return value instead of collect() because Rest API always returns only once.

Flattening Flows

flatMapConcat()

@FlowPreview fun <T, R> Flow<T>.flatMapConcat(
    transform: suspend (value: T) -> Flow<R>
): Flow<R>

The transform of flatMapConcat() will return a flow for each value. Then, flatMapConcat() will execute these flows in sequence, and sequentially gather all the values ​​emitted by emit() in these flows into a new flow. The example is as follows.

fun double(value: Int) = flow {
    emit(value)
    delay(100)
    emit(value)
}

fun main() = runBlocking {
    (1..3).asFlow()
        .flatMapConcat { double(it) }
        .collect { println(it) }
}

The output is as follows.

1
1
2
2
3
3

flatMapMerge()

@FlowPreview fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R>

Different from flatMapConcat(), flatMapMerge() will execute all returned flows simultaneously. Then, according to the order of emit(), collect the values ​​and put them in a new flow.

Reimplement the previous example with flatMapMerge(), and see what is different.

fun double(value: Int) = flow {
    emit(value)
    delay(100)
    emit(value)
}

fun main() = runBlocking {
    (1..3).asFlow()
        .flatMapMerge { double(it) }
        .collect { println(it) }
}

Its output is below.

1
2
3
1
2
3

Composing Multiple Flows

zip()

fun <T1, T2, R> Flow<T1>.zip(
    other: Flow<T2>,
    transform: suspend (T1, T2) -> R
): Flow<R>

zip() converts each pair of values in the 2 flows to a new value with transform. When the number of values ​​of two flows is different, it will end when all values of one of the flows are all processed. The example is as follows.

fun main() = runBlocking {
    val nums = (1..3).asFlow()
        .onEach { delay(29) }
    val strs = flowOf('a', 'b', 'c', 'd')
        .onEach { delay(37) }
    nums.zip(strs) { a, b -> "$a -> $b" }
        .collect { println(it) }
}

The output is as follows.

1 -> a
2 -> b
3 -> c

combine()

@JvmName("flowCombine") fun <T1, T2, R> Flow<T1>.combine(
    flow: Flow<T2>,
    transform: suspend (a: T1, b: T2) -> R
): Flow<R>

combine() combines multiple flows. It is similar to zip(), but they have different mechanisms for obtaining values ​​from flows. As long as one flow has called emit(), it will take the last value of other flows, even if those values ​​have been processed.

Let’s use combine() to rewrite the above code, and see what is different.

fun main() = runBlocking {
    val nums = (1..3).asFlow()
        .onEach { delay(29) }
    val strs = flowOf('a', 'b', 'c', 'd')
        .onEach { delay(37) }
    nums.combine(strs) { a, b -> "$a -> $b" }
        .collect { println(it) }
}

The output is as follows.

1 -> a
2 -> a
2 -> b
3 -> b
3 -> c
3 -> d

Other Operators

onCompletion()

fun <T> Flow<T>.onCompletion(
    action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T>

onCompletion() will be called after a flow is completed, whether it is successfully completed, cancelled, or an error occurs, it will be called. The example is as follows.

fun main() = runBlocking {
    (1..5).asFlow()
        .onCompletion { e -> println("Completion: $e") }
        .collect {
            println("Value is $it")
            if (it == 3) {
                throw Exception("Error")
            }
        }
}

The output is as follows.

Value is 1
Value is 2
Value is 3
Completion: java.lang.Exception: Error

onEach()

fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>

onEach() allows you to process some things between one upstream and the next downstream, such as printing out some debug messages for each value. The example is as follows.

fun main() = runBlocking {
    ('a'..'c').asFlow()
        .onEach { println("Check: $it") }
        .map { it.toInt() }
        .collect {
            println("Value is $it")
        }
}

The output is as follows.

Check: a
Value is 97
Check: b
Value is 98
Check: c
Value is 99

catch()

fun <T> Flow<T>.catch(
    action: suspend FlowCollector<T>.(cause: Throwable) -> Unit
): Flow<T>

When an exception is thrown out from a flow, catch() will be called, but we cannot recover the exception in catch().

fun main() = runBlocking {
    (1..5).asFlow()
        .onEach { if (it > 3) throw Exception("Value should not larger than 3") }
        .catch { e -> println("Caught exception: $e") }
        .onCompletion { println("Completion") }
        .collect { println("Value is $it") }
}

The output is as follows.

Value is 1
Value is 2
Value is 3
Caught exception: java.lang.Exception: Value should not larger than 3
Completion

Flow Context

In the case where no context is specified, flows will be run under the context where collect() is called. In the following code, collect() is called under Dispatchers.Main.

fun main() = runBlocking {
    (1..2).asFlow()
        .onEach { println("onEach1: $it is on ${Thread.currentThread().name}") }
        .onEach { println("onEach2: $it is on ${Thread.currentThread().name}") }
        .onEach { println("onEach3: $it is on ${Thread.currentThread().name}") }
        .collect { println("collect: $it is on ${Thread.currentThread().name}") }
}

The output is as follows.

onEach1: 1 is on main @coroutine#1
onEach2: 1 is on main @coroutine#1
onEach3: 1 is on main @coroutine#1
collect: 1 is on main @coroutine#1
onEach1: 2 is on main @coroutine#1
onEach2: 2 is on main @coroutine#1
onEach3: 2 is on main @coroutine#1
collect: 2 is on main @coroutine#1

flowOn()

fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>

flowOn() allows operators that have not been assigned a context to run under the context it specifies.

Let us add flowOn() to the previous code. Let’s take a look at the threads of each onEach().

fun main() = runBlocking {
    (1..2).asFlow()
        .onEach { println("onEach1: $it is on ${Thread.currentThread().name}") }
        .onEach { println("onEach2: $it is on ${Thread.currentThread().name}") }
        .flowOn(Dispatchers.IO)
        .onEach { println("onEach3: $it is on ${Thread.currentThread().name}") }
        .collect { println("collect: $it is on ${Thread.currentThread().name}") }
}

The output is as follows.

onEach1: 1 is on DefaultDispatcher-worker-2 @coroutine#2
onEach2: 1 is on DefaultDispatcher-worker-2 @coroutine#2
onEach1: 2 is on DefaultDispatcher-worker-2 @coroutine#2
onEach2: 2 is on DefaultDispatcher-worker-2 @coroutine#2
onEach3: 1 is on main @coroutine#1
collect: 1 is on main @coroutine#1
onEach3: 2 is on main @coroutine#1
collect: 2 is on main @coroutine#1

callbackFlow()

@ExperimentalCoroutinesApi fun <T> callbackFlow(
    block: suspend ProducerScope<T>.() -> Unit
): Flow<T>

callbackFlow() can convert callback-style code into a flow. For example, we can wrap Rest API code into a flow.

The code below shows how to convert callback-style code into flow.

fun getUser(): Flow<String> = callbackFlow {
    val callback = object : GetUserCallback {
        override fun onNextValue(value: String) {
            offer(value)
        }

        override fun onApiError(cause: Throwable) {
            close(cause)
        }

        override fun onCompleted() {
            close()
        }
    }
    getUserApi.register(callback)
    awaitClose { getUserApi.unregister(callback) }
}

callbackFlow() uses offer() to send values ​​to a flow, as opposed to using emit() in a flow. Because flow can return multiple values, we can call offer() multiple times to send values ​​to a flow. When you don’t need to send any more values, call close() to close it. If an error occurs, also use close() to close it with an exception. After you call close(), awaitClose() will be called.

Exceptions

When any operator in a flow throws an exception, the entire flow will be cancelled. Moreover, this exception will also be exposed to the caller, so we can use try-catch to catch the exception. The following equation code.

fun main() = runBlocking {
    val nums = (1..2).asFlow()
        .onEach { check(it < 2) }

    try {
        nums.collect { println(it) }
    } catch (e: Exception) {
        println("Caught exception: $e")
    }
}

The output is as follows.

1
Caught exception: java.lang.IllegalStateException: Check failed.

Conclusion

If you have used reactive to handle asynchronous calls, you definitely don’t want to go back to use callbacks to handle asynchronous calls. It is easy to write callback-hell code because it uses callbacks to handle asynchronous. That is, a callback contains another callback in which contains another callback. Also, with flow, we don’t need to introduce other reactive packages.

Leave a Reply

Your email address will not be published. Required fields are marked *

You May Also Like