Kotlin Coroutine Flow 教學

Photo by Hans-Jurgen Mager on Unsplash
Photo by Hans-Jurgen Mager on Unsplash
Flow 是 Kotlin Coroutine 的其中一個功能。它可以讓一個 suspending 函式回傳多個值。此外,它的設計受到 ReactiveX 啟發,所以它的用法與函式名稱都和 ReactiveX 很相似。

Flow 是 Kotlin Coroutine 的其中一個功能。它可以讓一個 suspending 函式回傳多個值。此外,它的設計受到 ReactiveX 啟發,所以它的用法與函式名稱都和 ReactiveX 很相似。Flow 不但可以用來取代 ReactiveX 套件,還和 Kotlin Coroutine 其他的功能一起運作良好。

Flow

Flow 是 Kotlin coroutine 的其中一個功能。為了能夠完全地了解 flow,最好先了解一下 coroutine。如果你不熟悉 coroutine 的話,可以先閱讀下面的文章。

建立第一個 Flow

首先,先來執行我們第一個 flow 程式。

fun getSequence(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        emit(i)
    }
}

fun main() = runBlocking {
    getSequence()
        .collect { value ->
            println(value)
        }
}

上面程式碼會輸出以下。

1
2
3

flow() 是一個 flow builder,它會建立一個 Flow<T>。而且,flow 是一個 Reactive stream,就如同我們一開始所提到,flow 受到 ReactiveX 的啟發。

emit() 會將結果發送到 flow 裡。我們知道 flow 是可以回傳多個值,所以你可以多次地呼叫 emit() 來發送多的值。如同範例裡,它呼叫了 emit() 三次,分別發送 1、2、和 3。

collect() 接收 emit() 發送出來的值。每呼叫一次 emit() 來發送一個值,collect() 的 block 就會被執行一次來接收這個值。因此,在範例中,emit() 被呼叫了三次,所以 collect() 的 block 也會被執行三次。

Flow Builders

Flow 提供了一些的 flow builders,我們接下來會介紹三個常用的 flow builders。

flow()

fun <T> flow(
    block: suspend FlowCollector<T>.() -> Unit
): Flow<T>

我們在一開始就有介紹到 flow(),它會建立一個 cold flow。所謂的 cold flow 就是,每次當你呼叫 collect() 時,它才會執行 flow() 的 block。然後,我們在 block 裡利用 emit() 來發送值到 upstream。

下面的程式碼顯示,flow() 的 block 是呼叫 collect() 之後才被執行的。

fun getSequence(): Flow<Int> = flow {
    for (i in 1..3) {
        delay(100)
        println("Emit $i")
        emit(i)
    }
}

fun main() = runBlocking {
    val f = getSequence()
    println("Start to collect")
    f.collect { value ->
        println("Collected $value")
    }
}

其輸出以下。

Start to collect
Emit 1
Collected 1
Emit 2
Collected 2
Emit 3
Collected 3

asFlow()

@FlowPreview fun <T> BroadcastChannel<T>.asFlow(): Flow<T>

asFlow() 會建立一個 hot flow。hot flow 是指,資料在 flow 建立時就已經準備好在 stream 裡了;而 cold flow 是當呼叫 collect() 時,才會去執行 flow() 的 block,並且在 block 裡呼叫 emit() 將資料發送到 stream 裡面。所以,hot flow 像是一個 hot data source。

下面的程式碼顯示如何用 asFlow() 建立一個 flow。

fun main() = runBlocking {
    val f = (1..3).asFlow()
    println("1. Start to collect")
    f.collect { value ->
        println("Collected $value")
    }
    println("2. Start to collect")
    f.collect { value ->
        println("Collected $value")
    }
}

其輸出以下。

1. Start to collect
Collected 1
Collected 2
Collected 3
2. Start to collect
Collected 1
Collected 2
Collected 3

flowOf()

fun <T> flowOf(vararg elements: T): Flow<T>

flowOf() 建立一個 flow,並將 flowOf() 的所有參數值放到 stream 裡。其範例如下。

fun main() = runBlocking {
    val f = flowOf(1, 2, 3)
    println("1. Start to collect")
    f.collect { value ->
        println("Collected $value")
    }
    println("2. Start to collect")
    f.collect { value ->
        println("Collected $value")
    }
}

其輸出以下。

1. Start to collect
Collected 1
Collected 2
Collected 3
2. Start to collect
Collected 1
Collected 2
Collected 3

Operators

Flow 是循序的(sequential)。在 flow 建立之後,並且在呼叫 collect() 之前,我們可以呼叫各種 flow operators 來處理或轉換 flow 裡的值,而且這個執行順序是循序的。Flow 提供了很多不同的 operators,接下來我們將介紹一些常用的 operators。

Intermediate Flow Operators

map()

inline fun <T, R> Flow<T>.map(
    crossinline transform: suspend (value: T) -> R
): Flow<R>

map()transform 對 flow 裡的每一個值做轉換,並回傳一個新值到新的 flow。最後回傳新的 flow。其範例如下。

fun main() = runBlocking {
    (1..3).asFlow()
        .map { "Hello $it" }
        .collect { println(it) }
}

其輸出以下。

Hello 1
Hello 2
Hello 3

filter() or filterNot()

inline fun <T> Flow<T>.filter(
    crossinline predicate: suspend (T) -> Boolean
): Flow<T>

filter() 會去過濾 flow 裡的值。只有當 predicate 回傳 true 時,才會將值放進新的 flow。

以下的範例顯示如何用 filter() 來過濾掉奇數。

fun main() = runBlocking {
    (1..10).asFlow()
        .filter { it % 2 == 0 }
        .collect { println(it) }
}

其輸出如下。

2
4
6
8
10

transform()

inline fun <T, R> Flow<T>.transform(
    crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R>

transform() 是最基本的 operator。你可以用它來客製化一個 operator。在 transform() 的 transform 參數裡,你必須要用 emit() 來將新值放到新的 flow 裡。而且,你可以呼叫 emit() 多次。

以下程式碼中,我們用 transform() 來重新實作過濾掉奇數,並且重複偶數。

fun main() = runBlocking {
    (1..10).asFlow()
        .transform {
            if (it % 2 == 0) {
                emit(it)
                emit(it)
            }
        }
        .collect { println(it) }
}

其輸出以下。

2
2
4
4
6
6
8
8
10
10

Terminal Flow Operators

collect()

suspend fun Flow<*>.collect(): Unit

inline suspend fun <T> Flow<T>.collect(
    crossinline action: suspend (value: T) -> Unit
): Unit

我們在上面很多範例中已經看過 collect() 很多次了。collect() 會結束(terminal)flow,並且接收 flow 裡的值。

reduce()

suspend fun <S, T : S> Flow<T>.reduce(
    operation: suspend (accumulator: S, value: T) -> S
): S

reduce() 會結束 flow。它的 operation 有兩個參數。accumulator 一開始會是 flow 裡第一個值,而 value 一開始會是 flow 裡第二個值。之後,accumulator 會是上一次 operation 的回傳值,而 value 會是下一個 flow 裡的值。

以下顯示如何用 reduce() 來計算 1 到 5 的總和。

fun main() = runBlocking {
    val sum = (1..5).asFlow()
        .reduce { accumulator, value ->
            println("accumulator=$accumulator, value=$value")
            accumulator + value
        }
    println("Sum of 1..3 is $sum")
}

其輸出以下。

accumulator=1, value=2
accumulator=3, value=3
accumulator=6, value=4
accumulator=10, value=5
Sum of 1..5 is 15

fold()

inline suspend fun <T, R> Flow<T>.fold(
    initial: R,
    crossinline operation: suspend (accumulator: R, value: T) -> R
): R

fold() 與 reduce() 幾乎一樣,只差必須在 initial 參數指定 accumulator 的初始值。所以 accumulator 一開始會是 initial

以下用 fold() 來重新實作 1 至 5 的總和,且初始值設為 100。

fun main() = runBlocking {
    val result = (1..5).asFlow()
        .fold(100) { accumulator, value ->
            println("accumulator=$accumulator, value=$value")
            accumulator + value
        }
    println("Result is $result")
}

其輸出如下。

accumulator=100, value=1
accumulator=101, value=2
accumulator=103, value=3
accumulator=106, value=4
accumulator=110, value=5
Result is 115

single()

suspend fun <T> Flow<T>.single(): T

single() 和 collect() 很像,但不同在於他只會接收一個值。如果 flow 裡有多的值,它會丟出 IllegalStateException。一個典型例子是,我們發出 Rest API 要求後,相較於 collect(),我們可以用 single() 來等待回傳值。因為 Rest API 總是只回傳一次。

Flattening Flows

flatMapConcat()

@FlowPreview fun <T, R> Flow<T>.flatMapConcat(
    transform: suspend (value: T) -> Flow<R>
): Flow<R>

flatMapConcat()transform 會對每一個值回傳一個 flow。然後,flatMapConcat() 會依序地執行這些 flow,並將這些 flow 裡 emit() 發出的值,依序地全部集合起來變成一個新的 flow。其範例如下。

fun double(value: Int) = flow {
    emit(value)
    delay(100)
    emit(value)
}

fun main() = runBlocking {
    (1..3).asFlow()
        .flatMapConcat { double(it) }
        .collect { println(it) }
}

其輸出如下。

1
1
2
2
3
3

flatMapMerge()

@FlowPreview fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R>

與 flatMapConcat() 不同的是,flatMapMerge() 會同時地執行所有回傳的 flow。然後,依照 emit() 的順序,將值收集起來放到新的 flow。

將上一個範例程式改用 flatMapMerge() 來實作,看看會有什麼不同。

fun double(value: Int) = flow {
    emit(value)
    delay(100)
    emit(value)
}

fun main() = runBlocking {
    (1..3).asFlow()
        .flatMapMerge { double(it) }
        .collect { println(it) }
}

其輸出以下。

1
2
3
1
2
3

Composing Multiple Flows

zip()

fun <T1, T2, R> Flow<T1>.zip(
    other: Flow<T2>,
    transform: suspend (T1, T2) -> R
): Flow<R>

zip()transform 將兩個 flows 的值,成對地轉變成一個新的值。當兩個 flow 的值的數量不同時,當其中一個 flow 的值都處理完後,就會結束。其範例如下。

fun main() = runBlocking {
    val nums = (1..3).asFlow()
        .onEach { delay(29) }
    val strs = flowOf('a', 'b', 'c', 'd')
        .onEach { delay(37) }
    nums.zip(strs) { a, b -> "$a -> $b" }
        .collect { println(it) }
}

其輸出如下。

1 -> a
2 -> b
3 -> c

combine()

@JvmName("flowCombine") fun <T1, T2, R> Flow<T1>.combine(
    flow: Flow<T2>,
    transform: suspend (a: T1, b: T2) -> R
): Flow<R>

combine() 會結合多個 flows。與 zip() 很像,但它們從 flow 取值的機制不相同。只要有一個 flow 有 emit() 值時,它會拿其它 flow 最後的值,即使那些值有被處理過。

用 combine() 來改寫上面的程式碼,讓我們看看會有什麼不同。

fun main() = runBlocking {
    val nums = (1..3).asFlow()
        .onEach { delay(29) }
    val strs = flowOf('a', 'b', 'c', 'd')
        .onEach { delay(37) }
    nums.combine(strs) { a, b -> "$a -> $b" }
        .collect { println(it) }
}

其輸出如下。

1 -> a
2 -> a
2 -> b
3 -> b
3 -> c
3 -> d

Other Operators

onCompletion()

fun <T> Flow<T>.onCompletion(
    action: suspend FlowCollector<T>.(cause: Throwable?) -> Unit
): Flow<T>

onCompletion() 會在 flow 完成後被呼叫,不管是成功地完成、被取消,或是發生錯誤,都會被呼叫。其範例如下。

fun main() = runBlocking {
    (1..5).asFlow()
        .onCompletion { e -> println("Completion: $e") }
        .collect {
            println("Value is $it")
            if (it == 3) {
                throw Exception("Error")
            }
        }
}

其輸出如下。

Value is 1
Value is 2
Value is 3
Completion: java.lang.Exception: Error

onEach()

fun <T> Flow<T>.onEach(action: suspend (T) -> Unit): Flow<T>

onEach() 讓你可以在某個 upstream 到下個 downstream 中,讓你處理一些事情,如對每個值印出一些 debug 訊息。範例如下。

fun main() = runBlocking {
    ('a'..'c').asFlow()
        .onEach { println("Check: $it") }
        .map { it.toInt() }
        .collect {
            println("Value is $it")
        }
}

其輸出如下。

Check: a
Value is 97
Check: b
Value is 98
Check: c
Value is 99

catch()

fun <T> Flow<T>.catch(
    action: suspend FlowCollector<T>.(cause: Throwable) -> Unit
): Flow<T>

在 flow 中,當有個 exception 被丟出時,catch() 會被呼叫,但我們無法在 catch() 中恢復這個 exception。

fun main() = runBlocking {
    (1..5).asFlow()
        .onEach { if (it > 3) throw Exception("Value should not larger than 3") }
        .catch { e -> println("Caught exception: $e") }
        .onCompletion { println("Completion") }
        .collect { println("Value is $it") }
}

其輸出如下。

Value is 1
Value is 2
Value is 3
Caught exception: java.lang.Exception: Value should not larger than 3
Completion

Flow Context

在沒有指定 context 的形況下,flow 會被跑在呼叫 collect() 時的 context 下。如下程式碼中,collect() 是在 Dispatchers.Main 下被呼叫。

fun main() = runBlocking {
    (1..2).asFlow()
        .onEach { println("onEach1: $it is on ${Thread.currentThread().name}") }
        .onEach { println("onEach2: $it is on ${Thread.currentThread().name}") }
        .onEach { println("onEach3: $it is on ${Thread.currentThread().name}") }
        .collect { println("collect: $it is on ${Thread.currentThread().name}") }
}

其輸出如下。

onEach1: 1 is on main @coroutine#1
onEach2: 1 is on main @coroutine#1
onEach3: 1 is on main @coroutine#1
collect: 1 is on main @coroutine#1
onEach1: 2 is on main @coroutine#1
onEach2: 2 is on main @coroutine#1
onEach3: 2 is on main @coroutine#1
collect: 2 is on main @coroutine#1

flowOn()

fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>

flowOn() 可以讓它之前那些沒有被指定 context 的 operators 跑在它指定的 context 下。

讓我們在上一個程式碼中,加入 flowOn()。來看看各個 onEach() 的 thread。

fun main() = runBlocking {
    (1..2).asFlow()
        .onEach { println("onEach1: $it is on ${Thread.currentThread().name}") }
        .onEach { println("onEach2: $it is on ${Thread.currentThread().name}") }
        .flowOn(Dispatchers.IO)
        .onEach { println("onEach3: $it is on ${Thread.currentThread().name}") }
        .collect { println("collect: $it is on ${Thread.currentThread().name}") }
}

其輸出如下。

onEach1: 1 is on DefaultDispatcher-worker-2 @coroutine#2
onEach2: 1 is on DefaultDispatcher-worker-2 @coroutine#2
onEach1: 2 is on DefaultDispatcher-worker-2 @coroutine#2
onEach2: 2 is on DefaultDispatcher-worker-2 @coroutine#2
onEach3: 1 is on main @coroutine#1
collect: 1 is on main @coroutine#1
onEach3: 2 is on main @coroutine#1
collect: 2 is on main @coroutine#1

callbackFlow()

@ExperimentalCoroutinesApi fun <T> callbackFlow(
    block: suspend ProducerScope<T>.() -> Unit
): Flow<T>

callbackFlow() 可以將 callback-style 的程式碼轉換成 flow。如,我們可以將 Rest API 的函式包裝成 flow。

下方程式碼展示如何將 callback-style 的程式碼轉換成 flow。

fun getUser(): Flow<String> = callbackFlow {
    val callback = object : GetUserCallback {
        override fun onNextValue(value: String) {
            offer(value)
        }

        override fun onApiError(cause: Throwable) {
            close(cause)
        }

        override fun onCompleted() {
            close()
        }
    }
    getUserApi.register(callback)
    awaitClose { getUserApi.unregister(callback) }
}

callbackFlow() 是用 offer() 來發送值到 flow 的,相對於 flow 中是用 emit()。因為 flow 是可以回傳多的值的,所以我們可以多次呼叫 offer() 來發送值到 flow。不需要再發送值時,就要呼叫 close() 來關閉。如果發生錯誤時,也要用 close() 來關閉,並且帶入一個 exception 參數。在你呼叫 close() 之後,awaitClose() 就會被呼叫。

Exceptions

flow 中的任一個 operators 丟出一個 exception 時,整個 flow 就會被取消。而且,這個 exception 也會暴露給呼叫者,所以我們就可以用 try-catch 來接住 exception。如下方程式碼。

fun main() = runBlocking {
    val nums = (1..2).asFlow()
        .onEach { check(it < 2) }

    try {
        nums.collect { println(it) }
    } catch (e: Exception) {
        println("Caught exception: $e")
    }
}

其輸出如下。

1
Caught exception: java.lang.IllegalStateException: Check failed.

結論

如果你有使用過 reactive 來處理非同步程式的話,你一定不想再回去用 callback 的方式來處來非同步。因為用 callback 的方式處理非同步,很容易寫出 callback-hell 的程式碼。也就是,一個 callback 裡面包含著另一個 callback,而它又包含著另外一個 callback。而且,用 flow 的話,我們就不需要再引入其他 reactive 的套件。

發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *

You May Also Like
Photo by Gemma Evans on Unsplash
Read More

Android:Hilt 依賴注入

Hilt 是基於 Dagger 且設計在 Android 上使用的 dependency injection library。所以在開發 Android 時,使用 Hilt 會比使用 Dagger 更加地方便。本文章將介紹如何使用 Hilt。
Read More