Kotlin Coroutine 教學

Photo by Hans-Jurgen Mager on Unsplash
Photo by Hans-Jurgen Mager on Unsplash
Kotlin 的 coroutine 是用來取代 thread。它不會阻塞 thread,而且還可以被取消。Coroutine core 會幫你管理 thread 的數量,讓你不需要自行管理,這也可以避免不小心建立過多的 thread。

Kotlin 的 coroutine 是用來取代 thread。它不會阻塞 thread,而且還可以被取消。Coroutine core 會幫你管理 thread 的數量,讓你不需要自行管理,這也可以避免不小心建立過多的 thread。

Coroutine 套件

Kotlin coroutine 並不是直接由 Kotlin 語言提供的功能,而是以套件(package)的方式提供。所以,你必須先要在 build.gradle.kts 中引入 kotlin-coroutines-core 套件。

dependencies {
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2")
}

建立第一個 Coroutine

首先,先來執行我們第一個 Coroutine 程式。

fun main() = runBlocking {
    launch {
        println("Hello Coroutine!")
    }

    println("Complete")
}

其輸出如下。

Complete
Hello Coroutine!

launch() – 建立一個 Coroutine

launch() 會建立一個 coroutine,然後馬上返回並繼續執行。而且,這個新的 coroutine 會以非同步的方式執行區塊(block)裡面的程式碼。所以,會先看到輸出 Complete,之後才是 Hello Coroutine!

我們也可以先等待新的 coroutine 執行結束後再繼續,如下面的程式碼。

fun main() = runBlocking {
    val job = launch {
        println("Hello Coroutine!")
    }

    job.join()
    println("Complete")
}

這會輸出以下。

Hello Coroutine!
Complete

launch() 會回傳一個 Job 型態的物件,而這個 Job 就是代表 coroutine 了。之後,呼叫 Job.join()當前 runBlocking() 的 coroutine 會被暫停(suspend),等待 job 的 coroutine 執行結束後,再繼續執行。當前的 coroutine 被暫停後會釋放當前的 thread,所以當前的 thread 可以去執行其他的 coroutine。這也就是為何 coroutine 的效能優於 thread,因為它比 thread 更加地輕量。

runBlocking() – 阻塞(Blocking)與非阻塞(Non-Blocking)世界間的橋樑

fun <T> runBlocking(
    context: CoroutineContext = EmptyCoroutineContext,
    block: suspend CoroutineScope.() -> T
): T

我們一開始有提到 Kotlin 並不是直接提供 coroutine 功能,而是透過引入套件的方式。也就是說,Kotlin 程式在一開始執行時,它還是在以往的 thread 世界裡。和 launch() 一樣,runBlocking() 實際上就是建立一個新個 coroutine。

launch() 也可以不用在 runBlocking() 裡面使用,如下。

fun main() {
    GlobalScope.launch {
        println("Hello Coroutine!")
    }

    println("Complete")
}

這會輸出以下。

Complete

你會發現 launch() 的 block 沒有被執行。這是因為 main thread 沒有等待新建立的 coroutine 執行完成,就直接結束了。

和本文章中第一個程式碼比較,我們會發現這兩個程式碼幾乎一樣,就只有差在有沒有使用 runBlocking()。有用 runBlocking() 的版本中,我們可以看到 launch() 回傳的 coroutine 會被執行完成。

所以,runBlocking() 會建立一個新的 coroutine。而且,這個 coroutine 會等待它裡面所有的 child coroutines 執行完成後,才會結束。此外,runBlocking() 會阻塞(block)當前的 thread。也就是說,當前的 thread 會被阻塞,要一直等到 runBlocking() 結束後,才會繼續執行。

suspend 函式

suspend 是一個 Kotlin 語言提供的關鍵字。它是用來修飾 function 或是 lambda。一個 suspend 函式可以被暫停(paused),而且稍後可以被重啟(resumed)。Suspend 函式只可以在 coroutine 裡或其他的 suspend 函式裡被呼叫。

我們剛剛使用到的 Job.join()、Deferred<T>.await()、以及 delay() 都是 suspend 函式。所以,它們所在的 coroutine 可以被暫停,thread 會去執行其他的 coroutine。最後,等它們要重啟時,thread 會從上次暫停的地方繼續執行。

讓我們來看看如何建立一個 suspend 函式。

suspend fun printHello(count: Int) {
    println("Hello $count")
    delay(1000)
}

fun main() = runBlocking {
    launch {
        repeat(3) {
            printHello(it)
        }
    }

    repeat(3) {
        println("--- $it")
        delay(1000)
    }

    println("Complete")
}

上面的程式碼會輸出以下。

--- 0
Hello 0
--- 1
Hello 1
--- 2
Hello 2
Complete

Kotlin 編譯器會改寫每個 suspend 函式,其細節在 Deep Dive into Coroutines on JVM by Roman Elizarov 有被提及。怎麼改寫的細節我們不在這裡討論,但有一個很重要的事情是,每個 suspend function 都是一個 suspension point。當你要取消一個 coroutine 時,要等到這個 coroutine 執行到下一個 suspend function 時,才可以被暫停或是取消。

下方的程式碼顯示,當一個 coroutine 裡面沒有呼叫 suspend function 時,它是無法被暫停的。

fun main() = runBlocking {
    val job = launch {
        val start = System.currentTimeMillis()
        for (i in 1..999999) {
            println("$i")
        }
        val spent = System.currentTimeMillis() - start
        println("Coroutine spent $spent")
    }

    delay(1)
    job.cancel()

    println("Complete")
}

上面的程式碼會輸出以下。

Hello 1
...
Hello 999999
Coroutine spent 9265
Complete

Lazy Coroutines

當使用 coroutine builder(如 launch())建立一個 coroutine 時,這個 coroutine 預設是馬上開始。我們也可以稍後再開始這個 coroutine,這稱為 lazy coroutines。

其用法很簡單,就是設定 start 參數為 CoroutineStart.LAZY,如下。

fun main() = runBlocking {
    val job = launch(start = CoroutineStart.LAZY) {
        println("Hello World")
    }

    println("Start coroutine")
    job.start()

    println("Complete")
}

以上程式碼會輸出以下。

Start coroutine
Complete
Hello World

Coroutine Context

每一個 coroutine 都有一個型態為 CoroutineContext 的 context,這 context 包含著一些元素(element)。

public interface CoroutineContext {
    /**
     * Returns the element with the given [key] from this context or `null`.
     */
    public operator fun <E : Element> get(key: Key<E>): E?

    /**
     * Accumulates entries of this context starting with [initial] value and applying [operation]
     * from left to right to current accumulator value and each element of this context.
     */
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R

    /**
     * Returns a context containing elements from this context and elements from  other [context].
     * The elements from this context with the same key as in the other one are dropped.
     */
    public operator fun plus(context: CoroutineContext): CoroutineContext

    /**
     * Returns a context containing elements from this context, but without an element with
     * the specified [key].
     */
    public fun minusKey(key: Key<*>): CoroutineContext

    /**
     * Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
     */
    public interface Key<E : Element>
}

此外,operator func plus 是一個非常重要的函式,他可以合併兩個 context。在下方的程式碼中,launch(this.coroutineContext + Dispatchers.Default) 等同於 launch(Dispatchers.Default)

fun main() = runBlocking {
    launch(this.coroutineContext + Dispatchers.Default) {
        println("child job: ${Thread.currentThread().name}")
    }
    println("parent job: ${Thread.currentThread().name}")
}

Coroutine context 實際上會被包含在一個 scope 裡面,這個 scope 的型態為 CoroutineScope

public interface CoroutineScope {
    /**
     * The context of this scope.
     * Context is encapsulated by the scope and used for implementation of coroutine builders that are extensions on the scope.
     * Accessing this property in general code is not recommended for any purposes except accessing the [Job] instance for advanced usages.
     *
     * By convention, should contain an instance of a [job][Job] to enforce structured concurrency.
     */
    public val coroutineContext: CoroutineContext
}

每一個 coroutine builder(如 launch())都是 CoroutineScope 的 extension。這些 coroutine builder 會繼承當前 scope 裡的 context,並且將參數中帶進來的 context 來 override 繼承的 context(使用 operator fun plus 來合併),如下。其實 launch() 的第一個參數預設就是 EmptyCoroutineContext。此外,在這些 coroutine builder 的 block 裡,this 就是當前 coroutine 的 coroutine scope。

fun main() = runBlocking {
    launch(CoroutineName("child")) {
        println("Hello World")
    }
}

這個新的 context 會是 child coroutine 的 parent context。這個 parent context 會被用來建立 child coroutine。然後,parent context 加上 child coroutine 的 job(也就是 child job),就會變成 child coroutine 的 context(也就是 child context)。這部分的細節在 Coroutine Context and Scope 一文中有詳細的解說。

Jobs

一個 coroutine 是用一個 Job 物件來表示,而 Job 也是 CoroutineContext 中的一個元素。Job 負責 coroutine 的 lifecycle、cancellation、以及 parent-child 關係。

下面程式碼顯示如何存取 scope 中的 job。程式碼中的 Job 是元素的 key。

fun main() = runBlocking {
    println("Current job is ${coroutineContext[Job]}")
}

上方的程式碼會輸出以下。

Current job is "coroutine#1":BlockingCoroutine{Active}@13b6d03

Dispatchers

Dispatcher 也是 context 中其中一個重要的元素。它決定了用什麼樣的 thread 來執行它關聯的 coroutine。Dispatchers 物件預先定義了以下的 dispatcher。

  • Dispatchers.Default:預設的 dispatcher。
  • Dispatchers.IO:設計用來執行 IO 工作。
  • Dispatchers.Main:在 main thread 上執行 coroutine。
  • Dispatchers.Unconfined:在當前的 thread 上執行 coroutine。稍後被重啟(resume)時,直接在呼叫 resume 的 thread 上繼續執行 coroutine。

Default 和 IO dispatcher 都是有一個 thread pool,所以 dispatcher 可以同時執行多的 coroutine。

在沒有指定 dispatcher 的情況話,大部分的 coroutine builder 會繼承當前的 dispatcher。之前,有提及 coroutine builder 會繼承當前的 context,而 context 也包含了一個 dispatcher。下面的程式碼會輸出各個 coroutine 所在的 thread。

fun main() = runBlocking {
    launch {
        println("parent job: ${Thread.currentThread().name}")

        launch {
            println("child job: ${Thread.currentThread().name}")
        }
    }
    println("runBlocking: ${Thread.currentThread().name}")
}

上方程式碼會輸出以下。

runBlocking: main @coroutine#1
parent job: main @coroutine#2
child job: main @coroutine#3

runBlocking() 會直接拿當前的 thread,也就是 main thread,所以是 Dispatchers.Main。之後,launch() 會繼承當前的 context,所以也就繼承使用 Dispatchers.Main。

在下面的程式碼中,我們指定 dispatcher。

fun main() = runBlocking {
   launch(Dispatchers.Default) {
        println("parent job: ${Thread.currentThread().name}")

        launch {
            println("child job: ${Thread.currentThread().name}")
        }
    }
    println("runBlocking: ${Thread.currentThread().name}")
}

下面的輸出顯示,第一個 launch() 跑在指定的 Dispatchers.Default 上,而第二個 launch() 則繼承使用了第一個 launch() 的 dispatcher。

runBlocking: main @coroutine#1
parent job: DefaultDispatcher-worker-1 @coroutine#2
child job: DefaultDispatcher-worker-2 @coroutine#3

Coroutine Builders

launch()

fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job

在本文章一開始已經有介紹 launch()。launch() 會建立一個 coroutine,然後馬上返回並繼續執行。而且,這個新的 coroutine 會以非同步的方式執行 block。

fun main() = runBlocking {
    val job = launch {
        println("Hello Coroutine!")
    }

    job.join()
    println("Complete")
}

上方的程式碼會輸出以下。

Hello Coroutine!
Complete

async()

fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T>

有時候你會希望 coroutine 執行結束後回傳一個值。async() 和 launch() 一樣會建立一個新的 coroutine,但是 async() 會回傳一個值,如下程式碼。

fun main() = runBlocking {
    val job = async {
        println("Hello Coroutine!")
        "Complete"
    }

    val result = job.await()
    println(result)
}

上方的程式碼會輸出以下。

Hello Coroutine!
Complete

async() 回傳一個 Deferred<T> 型態的物件,而 Deferred<T> 是繼承 Job 型態。所以,Deferred<T> 是指擱置的工作(deferred job)。當 async() 建立的 coroutine 執行結束時,它會回傳 block 中最後一個 statement,在範例中就是 Complete 字串。

與 Job.join() 相似,我們可以呼叫 Deferred<T>.await() 來等待 coroutine 執行結束。不同的是,它會回傳 block 中的回傳值。

Scoping Functions

withContext()

suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T

withContext() 建立一個新的 scope,並暫停執行當前的 block。如果參數中有指定與當前 scope 不同的 dispatcher 的話,那它會在不同的 thread 上執行它的 block。執行完畢後,再返回原本被暫停的 block,而且它還可以回傳一個值。所以,withContext() 主要的目的是讓當前的 coroutine 可以暫時地切換 context。此外,因為它會暫停執行當前的 block,所以它是一個 suspend 函式。

讓我們來看看以下的範例。

fun main() = runBlocking {
    val result = withContext(Dispatchers.Default) {
        println("job: ${Thread.currentThread().name}")
        "Complete"
    }
    println("runBlocking: ${Thread.currentThread().name}")
    println(result)
}

上方的程式碼會輸出以下。

job: DefaultDispatcher-worker-1 @coroutine#1
runBlocking: main @coroutine#1
Complete

coroutineScope()

suspend fun <R> coroutineScope(
    block: suspend CoroutineScope.() -> R
): R

coroutineScope() 建立一個新的 scope 並暫停執行當前的 block。當它的 block 執行完成後,再返回原本被暫停的 block。看起來與 withContext() 非常相似。如同 withContext(),它會回傳一個值,不過無法指定新的 context 給 coroutineScope() ,而且也會回傳一個值。此外,因為它會暫停執行當前的 block,所以它是一個 suspend 函式。

看起來在很多的情況下,coroutineScope() 和 withContext() 可以互相取代。這是沒有錯的事情。但是,其實它們的目的完全不同。前面有提到 withContext() 是被設計用來暫時切換到不同的 context 來執行 block。而,coroutineScope() 則是被設計用來執行一個有包含數個並行的 child coroutines 的 block。等 block 中所有的 child coroutines 執行完成後,才回返回到原本的 block。

coroutineScope() 的範例如下。

fun main() = runBlocking {
    coroutineScope {
        launch {
            println("job 1: ${Thread.currentThread().name}")
        }

        launch {
            println("job 2: ${Thread.currentThread().name}")
        }
    }
    println("main job: ${Thread.currentThread().name}")
}

其輸出如下。

job 1: main @coroutine#2
job 2: main @coroutine#3
main job: main @coroutine#1

suspendCancellableCoroutine() & suspendCoroutine()

inline suspend fun <T> suspendCancellableCoroutine(
    crossinline block: (CancellableContinuation<T>) -> Unit
): T

suspend inline fun <T> suspendCoroutine(
    crossinline block: (Continuation<T>) -> Unit
): T

suspendCancellableCoroutine()suspendCoroutine() 都是用來將 callback-style 的程式碼轉換成 suspend 函式。不同的是,suspendCancellableCoroutine() 是可以被取消的,但是 suspendCoroutine() 則不行。

下方程式碼展示如何將 callback-style 的程式碼轉換成 suspend 函式。

suspend fun fun1(): String = suspendCancellableCoroutine { continuation ->
    val callback = object : GetUserCallback {
        override fun onCompleted(value: String) {
            continuation.resume(value)
        }

        override fun onError(cause: Throwable) {
            continuation.resumeWithException(cause)
        }
    }
    getUserApi.register(callback)
    continuation.invokeOnCancellation { getUserApi.unregister(callback) }
    getUserApi.requestGet(url)
}

suspend fun fun2(): String = suspendCoroutine { continuation ->
    val callback = object : GetUserCallback {
        override fun onCompleted(value: String) {
            continuation.resume(value)
        }

        override fun onError(cause: Throwable) {
            continuation.resumeWithException(cause)
        }
    }
    getUserApi.register(callback)
    getUserApi.requestGet(url)
}

fun main() = runBlocking {
    val result1 = fun1()
    val result2 = fun2()
    println("$result1 & $result2")
}

我們可以看到 suspendCancellableCoroutine()suspendCoroutine() 的 block 被執行完後,coroutine 就會被暫停。一直等到 API 回來後,呼叫 Continuation.resume() ,coroutine 才會被重啟。

Cancellation

有時候你會有一個長時間在背景執行的 coroutine。當你不再需要它時,你可以呼叫 Job.cancel() 來取消它。此外,我們之前有提及到,Job 是有 parent-child 階層關係的。當你取消一個 Job 時,它所有的 child jobs 也都會被取消。

下面的範例顯示,當取消一個 Job 時,不論是 launch()、async()、coroutineScope()、以及 withContext(),它們 block 裡的所有的 child 都會被取消。

fun main() = runBlocking {
    val job = launch {
        coroutineScope {
            launch {
                repeat(10) { i ->
                    println("launch child: $i")
                    delay(500)
                }
            }

            async {
                repeat(10) { i ->
                    println("async child: $i")
                    delay(500)
                }
            }

            repeat(10) { i ->
                println("coroutineScope: $i")
                delay(500)
            }
        }
    }

    delay(1200)
    job.cancel()
    println("Complete")
}

其輸出如下。

coroutineScope: 0
launch child: 0
async child: 0
coroutineScope: 1
launch child: 1
async child: 1
coroutineScope: 2
launch child: 2
async child: 2
Complete

Exceptions

傳播 Exceptions

當一個 coroutine 丟出一個 exception 時,它 child coroutines 都會被取消,並且一層一層地將 exception 往 parent coroutine 丟出去,直到 root coroutine 為止,如下方的程式碼。

fun main() = runBlocking { // root coroutine
    val job1 = async {
        val job2 = launch {
            throw Exception("From job2")
        }

        job2.join()
        println("job1 is done")
    }

    job1.await()
    println("runBlocking is done")
}

其輸出如下。

Exception in thread "main" java.lang.Exception: From job2

我們可以看到 job2 coroutine 到 root coroutine(runBlocking)都被取消了。

接下來我們們在 job1 中接住 exception,看看會發生什麼變化。

fun main() = runBlocking { // root coroutine
    val job1 = async {
        val job2 = launch {
            throw Exception("From job2")
        }

        try {
            job2.join()
        } catch (e: Exception) {
            println("Caught an exception: $e")
        }
        println("job1 is done")
    }

    job1.await()
    println("runBlocking is done")
}

其輸出如下。

Caught an exception: java.lang.Exception: From job2
job1 is done
Exception in thread "main" java.lang.Exception: From job2

我們發現 exception 回被傳播到所有的 parent coroutine 直到 root coroutine,即使途中 job1 有接住 exception,還是會繼續往上傳播。不過,job1 沒有被取消,可以繼續執行 。

向呼叫者曝露 Exceptions

接下的範例中,我們加上一個 root coroutine,看看會有什麼變化。

fun main() = runBlocking { // root coroutine
    val rootJob = GlobalScope.launch { // root coroutine
        val job1 = async {
            val job2 = launch {
                throw Exception("From job2")
            }

            job2.join()
            println("job1 is done")
        }

        job1.await()
        println("rootJob is done")
    }

    rootJob.join()
    println("runBlocking is done")
}

其輸出如下。

runBlocking is done
Exception in thread "DefaultDispatcher-worker-1 @coroutine#2" java.lang.Exception: From job2

可以看到 exception 會取消整個 parent 和 child coroutines,直到 root coroutine。所以,runBlocking()rootJob 取消後,還會繼續執行。

再看看下方的程式碼,我們改用 async() 來建立 root coroutine,看看會有什麼不同。

fun main() = runBlocking {
    val rootJob = GlobalScope.async {
        val job1 = async {
            val job2 = launch {
                throw Exception("From job2")
            }

            job2.join()
            println("job1 is done")
        }

        job1.await()
        println("rootJob is done")
    }

    rootJob.await()
    println("runBlocking is done")
}

其輸出以下。

Exception in thread "main" java.lang.Exception: From job2

我們可以看到 launch() 和 async() 建立的 root coroutine 會有不同的行為。當 launch() 建立的 root coroutine 有 exception 時, 它可以繼續執行。但是,async() 建立的 root coroutine 有 exception 時,它會將 exception 丟出來,讓呼叫方來處理,如下。

fun main() = runBlocking {
    val rootJob = GlobalScope.async {
        val job1 = async {
            val job2 = launch {
                throw Exception("From job2")
            }

            job2.join()
            println("job1 is done")
        }

        job1.await()
        println("rootJob is done")
    }

    try {
        rootJob.await()
    } catch (e: Exception) {
        println("Caught an exception: $e")
    }
    println("runBlocking is done")
}

其輸出如下。

Caught an exception: java.lang.Exception: From job2
runBlocking is done

處理 Uncaught Exceptions

剛剛我們討論到 root coroutines 向呼叫者暴露 exceptions,如果呼叫者沒有接住這些 exceptions 時,這些 exceptions 稱為 uncaught exceptions。Kotlin coroutine 的 CoroutineExceptionHandler 可以讓我們自定義如何輸出這些 uncaught exceptions 到 console。CoroutineExceptionHandler 必須在到 root coroutine 建立時傳入。

fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("CoroutineExceptionHandler got $exception")
    }
    val job1 = GlobalScope.launch(handler) {
        val job2 = async {
            throw Exception("from job2")
        }

        job2.await()
        println("job1 is done")
    }

    job1.join()
    println("runBlocking is done")
}

其輸出如下。

CoroutineExceptionHandler got java.lang.Exception: from job2
runBlocking is done

值得注意的是,在 CoroutineExceptionHandler 裡,你是無法將恢復 exception 的。

Timeout

suspend fun <T> withTimeout(
    timeMillis: Long,
    block: suspend CoroutineScope.() -> T
): T

suspend fun <T> withTimeoutOrNull(
    timeMillis: Long,
    block: suspend CoroutineScope.() -> T
): T?

withTimeout() 可以設定 timeout 來執行 block,並回傳一個值。如果 timeout,則會丟出 TimeoutCancellationException。

fun main() = runBlocking {
    try {
        val result = withTimeout(1000) {
            repeat(10) { i ->
                println("withTimeout $i")
                delay(500)
            }
            "Complete"
        }
        println(result)
    } catch (e: TimeoutCancellationException) {
        println(e)
    }
}

上面的程式碼輸出如下。

withTimeout 0
withTimeout 1
kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms

withTimeoutOrNull() 和 withTimeout() 一樣,但是 timeout 時,它不會丟出 TimeoutCancellationException,而是回傳 null。範例如下。

fun main() = runBlocking {
    val result = withTimeoutOrNull(1000) {
        repeat(10) { i ->
            println("withTimeout $i")
            delay(500)
        }
        "Complete"
    }

    println("Result is $result")
}

上面的程式碼輸出如下。

withTimeout 0
withTimeout 1
Result is null

結論

Kotlin coroutine 比 thread 更輕量、效能更好。Coroutine 被暫停時,不會阻塞 thread。除此之外,suspend 函式可以讓非同步程式寫起來像寫同步程式,這讓程式更加易讀。Coroutine 一開始不太好學習,但一但你熟悉它後,你會無法再回去用 threads 了。

2 comments
發佈留言

發佈留言必須填寫的電子郵件地址不會公開。 必填欄位標示為 *

You May Also Like
Photo by Gemma Evans on Unsplash
Read More

Android:Hilt 依賴注入

Hilt 是基於 Dagger 且設計在 Android 上使用的 dependency injection library。所以在開發 Android 時,使用 Hilt 會比使用 Dagger 更加地方便。本文章將介紹如何使用 Hilt。
Read More