Kotlin coroutine flow 包含 cold flow 和 hot flow 兩種。而,SharedFlow 和 StateFlow 是其中兩種 hot flow。它們可以 broadcast 值給數個接收者,所以可以被用來實作 publisher-subscriber 模式。
Table of Contents
Flow
SharedFlow 和 StateFlow 都是 hot flow,而 flow 又是 Kotlin coroutine 的其中一種功能。所以在開始閱讀本文章之前,必須要了解 Kotlin coroutine 和 flow。如果你不熟悉 Kotlin coroutine 和 flow 的話,可以先閱讀下面的文章。
SharedFlow & MutableSharedFlow
Publisher-Subscriber 模式
SharedFlow 可以 broadcast 值給多個接收者。發送值的那一方稱之為 publisher,接收值的那一方則稱之為 subscriber。所以我們可以用 SharedFlow 來取代 RxJava 的 PublishSubject。
Publisher 呼叫 emit() 來發送值,而 subscriber 呼叫 onEach() 來接收值。另外,我們不需要呼叫 collect() 來 terminate 目前的 flow,因為 SharedFlow 永遠不會 complete。不過,subscriber 可以呼叫 cancel() 來取消接收值。
Relay Cache
SharedFlow 會將最近發送的值存到 relay cache。如,relay cache 的大小是 3 的話,那 relay cache 會存儲最近 3 個已經發送過的值。新的 subscriber 一開始就會先接收到 relay cache 裡面所有的值,所以才會稱為 relay。
範例:Relay Cache = 0
以下是一個 relay cache 為 0 的範例。在範例中,我們宣告的不是 SharedFlow,而是 MutableSharedFlow。這是因為 SharedFlow 是 readonly,只有 MutableSharedFlow 才可以呼叫 emit() 來發送值。
在範例中,我們會先發送 -3、-2、-1,然後在建立兩個 subscriber。
val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
suspend fun main(args: Array<String>) {
val sharedFlow = MutableSharedFlow<Int>()
sharedFlow.emit(-3)
sharedFlow.emit(-2)
sharedFlow.emit(-1)
val subscriber1 = sharedFlow
.onEach {
println("#1 got $it (${currentCoroutineContext()})")
}
.launchIn(scope)
val subscriber2 = sharedFlow
.onEach {
println("#2 got $it (${currentCoroutineContext()})")
}
.launchIn(scope)
val publisher2 = scope.launch {
repeat(100) {
println("Emitting $it (${currentCoroutineContext()})")
sharedFlow.emit(it)
delay(1000)
}
}
println("Enter any key to cancel subscriber1")
readln()
subscriber1.cancel()
println("Enter any key to cancel subscriber2")
readln()
subscriber2.cancel()
println("Enter any key to stop emitting values")
readln()
publisher2.cancel()
}此範例會產生以下的輸出。我們可以看到 subscriber1 和 subscriber2 並沒有收到在它們 subscribe 之前就發送出來的值,因為 relay cache 設為 0。
Enter any key to cancel subscriber1
Emitting 0 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO])
#1 got 0 ([StandaloneCoroutine{Active}@6edf1369, Dispatchers.IO])
#2 got 0 ([StandaloneCoroutine{Active}@ec121f5, Dispatchers.IO])
Emitting 1 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO])
#1 got 1 ([StandaloneCoroutine{Active}@6edf1369, Dispatchers.IO])
#2 got 1 ([StandaloneCoroutine{Active}@ec121f5, Dispatchers.IO])
Enter any key to cancel subscriber2
Emitting 2 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO])
#2 got 2 ([StandaloneCoroutine{Active}@ec121f5, Dispatchers.IO])
Enter any key to stop emitting values
Emitting 3 ([StandaloneCoroutine{Active}@587082ff, Dispatchers.IO])範例:Relay Cache = 2
以下的範例是 relay cache 為 2。
val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
suspend fun main(args: Array<String>) {
val sharedFlow = MutableSharedFlow<Int>(replay = 2)
sharedFlow.emit(-3)
sharedFlow.emit(-2)
sharedFlow.emit(-1)
val subscribe1 = sharedFlow
.onEach {
println("#1 got $it (${currentCoroutineContext()})")
}
.launchIn(scope)
val subscribe2 = sharedFlow
.onEach {
println("#2 got $it (${currentCoroutineContext()})")
}
.launchIn(scope)
val publisher1 = scope.launch {
repeat(100) {
println("Emitting $it (${currentCoroutineContext()})")
sharedFlow.emit(it)
delay(1000)
}
}
println("Enter any key to cancel subscribe1")
readln()
subscribe1.cancel()
println("Enter any key to cancel subscribe2")
readln()
subscribe2.cancel()
println("Enter any key to stop emitting values")
readln()
publisher1.cancel()
}從以下的輸出中,我們可以看到,subscriber1 和 subscriber2 有接收到 -2 和 -1,但是沒有接收到 -3。這是因為 relay cache 為 2。
Enter any key to cancel subscribe1
Emitting 0 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO])
#1 got -2 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO])
#2 got -2 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])
#1 got -1 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO])
#2 got -1 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])
#1 got 0 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO])
#2 got 0 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])
Emitting 1 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO])
#1 got 1 ([StandaloneCoroutine{Active}@36c7ba04, Dispatchers.IO])
#2 got 1 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])
Enter any key to cancel subscribe2
Emitting 2 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO])
#2 got 2 ([StandaloneCoroutine{Active}@32a30cca, Dispatchers.IO])
Enter any key to stop emitting values
Emitting 4 ([StandaloneCoroutine{Active}@5bc02d70, Dispatchers.IO])用 SharedFlow 來取代 BroadcastChannel
BroadcastChannel 與 SharedFlow 非常地相似。重要的是 SharedFlow 是被設計出來取代 BroadcastChannel。如果你想知道它們之間的不同之處,可以在這裡找到。
StateFlow & MutableStateFlow
StateFlow 像是一個 replay cache 為 1 的 SharedFlow。而且,在初始化一個 StateFlow 時,就要先給定一個初始值到 replay cache。另外,如果發送的值和目前在 replay cache 裡的值一樣時,subscribes 不會被通知。所以我們用 StateFlow 來取代 RxJava 的的 BehaviorSubject。
下面的程式碼顯示,如何用 SharedFlow 來製作出和 StateFlow 一樣的行為。
val sharedFlow = MutableSharedFlow(
replay = 1,
onBufferOverflow = BufferOverflow.DROP_OLDEST
)
shared.tryEmit(initialValue)
val stateFlow = sharedFlow.distinctUntilChanged()讓我們來看一下要如何使用 StateFlow。
val scope = CoroutineScope(SupervisorJob() + Dispatchers.IO)
suspend fun main(args: Array<String>) {
val stateFlow = MutableStateFlow(-100)
val subscriber1 = stateFlow
.onEach {
println("#1 got $it (${currentCoroutineContext()})")
}
.launchIn(scope)
val subscriber2 = stateFlow
.onEach {
println("#2 got $it (${currentCoroutineContext()})")
}
.launchIn(scope)
val job = scope.launch {
delay(1000)
repeat(100) {
val value = it / 2
println("Emitting $value (${currentCoroutineContext()})")
stateFlow.emit(value)
delay(1000)
}
}
println("Enter any key to cancel subscriber1")
readln()
subscriber1.cancel()
println("Enter any key to cancel subscriber2")
readln()
subscriber2.cancel()
println("Enter any key to stop emitting values")
readln()
job.cancel()
}從以下的輸出可以看到,只有當發送的值和目前在 StateFlow 裡面的值不一樣的時候,subscribers 才會被通知。
Enter any key to cancel subscriber1
#2 got -100 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
#1 got -100 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
Emitting 0 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#1 got 0 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
#2 got 0 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
Emitting 0 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
Emitting 1 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#1 got 1 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
#2 got 1 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
Emitting 1 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
Emitting 2 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#1 got 2 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
#2 got 2 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
Emitting 2 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
Emitting 3 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#1 got 3 ([StandaloneCoroutine{Active}@3915fbe4, Dispatchers.IO])
#2 got 3 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
Enter any key to cancel subscriber2
Emitting 3 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
Emitting 4 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
#2 got 4 ([StandaloneCoroutine{Active}@5a117245, Dispatchers.IO])
Emitting 4 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])
Enter any key to stop emitting values
Emitting 5 ([StandaloneCoroutine{Active}@400474e9, Dispatchers.IO])用 StateFlow 來取代 ConflatedBroadcastChannel
ConflatedBroadcastChannel 與 StateFlow 非常地相似。重要的是 StateFlow 是被設計出來取代 ConflatedBroadcastChannel。如果你想知道它們之間的不同之處,可以在這裡找到。
結語
在寫程式的時候,我們有時會需要監聽某個狀態。SharedFlow 和 StateFlow 提供了一個比 BroadcastChannel 和 ConflatedBroadcastChannel 更為簡單的方式,讓我們可以使用 publisher-subscriber pattern。








