Kotlin Coroutine Tutorial

Photo by Hans-Jurgen Mager on Unsplash
Photo by Hans-Jurgen Mager on Unsplash
Kotlin’s coroutines are used to replace threads. They do not block threads, and they can be cancelled. Coroutine core helps you manage the number of threads.

Kotlin’s coroutines are used to replace threads. They do not block threads, and they can be cancelled. Coroutine core helps you manage the number of threads, so you don’t need to manage it yourself, which can also avoid accidentally creating too many threads.

Dependency

Kotlin’s coroutine is not a function directly provided by the Kotlin language, but is provided in a package. Therefore, you must first import the kotlin-coroutines-core dependency in build.gradle.kts.

dependencies {
    implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:1.4.2")
}

Building Your First Coroutine

First, let’s run our first Coroutine.

fun main() = runBlocking {
    launch {
        println("Hello Coroutine!")
    }

    println("Complete")
}

The output is as follows.

Complete
Hello Coroutine!

launch() – Creating a Coroutine

launch() creates a coroutine, then returns immediately and continues execution. Moreover, this new coroutine will execute the code in the block asynchronously. So, you will see it outputs Complete first and then Hello Coroutine!.

We can also wait for the new coroutine to finish execution before continuing, as shown in the following code.

fun main() = runBlocking {
    val job = launch {
        println("Hello Coroutine!")
    }

    job.join()
    println("Complete")
}

The output is as follows.

Hello Coroutine!
Complete

launch() will return a Job object, and this Job is represented as a coroutine. After calling Job.join(), the current coroutine of runBlocking() is suspended, waiting for the new coroutine of job to finish executing, and then continue. After the current coroutine is suspended, the current thread will be released, so the current thread can execute other coroutines. This is why coroutines are more efficient than threads because they are lighter than threads.

runBlocking() – The Bridge between Blocking and Non-Blocking Worlds

fun <T> runBlocking(
    context: CoroutineContext = EmptyCoroutineContext,
    block: suspend CoroutineScope.() -> T
): T

We mentioned at the beginning that Kotlin does not directly provide coroutine functions, but through packages. In other words, when Kotlin program is first executed, it is still in the world of threads. Like launch(), runBlocking() actually creates a new coroutine.

launch() can also be used without runBlocking(), as follows.

fun main() {
    GlobalScope.launch {
        println("Hello Coroutine!")
    }

    println("Complete")
}

This will output the following.

Complete

You can find that the launch() block is not executed. This is because the main thread ends directly without waiting for the execution of the newly created coroutine.

Comparing with the first code in this article, we will find that these two codes are almost the same, the only difference is whether runBlocking() is used. In the version with runBlocking(), we can see that the coroutine returned by launch() will be executed.

So, runBlocking() will create a new coroutine. Moreover, this coroutine will wait for all child coroutines in it to finish executing before it ends. In addition, runBlocking() will block the current thread. In other words, the current thread will blocked and will not continue until runBlocking() ends.

suspend functions

suspend is a keyword provided by Kotlin language. It is used to modify a function or a lambda. A suspending function can be suspended and later can be resumed. Suspending function can only be called in coroutines or other suspending functions.

Job.join(), Deferred<T>.await(), and delay() that we just used are all suspending functions. Therefore, the coroutines they are in can be suspended, and the threads will execute other coroutines. Finally, when they are about to be resume, the threads will continue execution from where it was suspended.

Let’s take a look at how to create a suspending function.

suspend fun printHello(count: Int) {
    println("Hello $count")
    delay(1000)
}

fun main() = runBlocking {
    launch {
        repeat(3) {
            printHello(it)
        }
    }

    repeat(3) {
        println("--- $it")
        delay(1000)
    }

    println("Complete")
}

The above code will output the following.

--- 0
Hello 0
--- 1
Hello 1
--- 2
Hello 2
Complete

Kotlin compiler will rewrite every suspending function, the details of which are mentioned in Deep Dive into Coroutines on JVM by Roman Elizarov. The details of how to rewrite are not discussed here, but one very important thing is that each suspending function is a suspension point. When you want to cancel a coroutine, you have to wait until the coroutine executes to the next suspending function before it can be suspended or cancelled.

The code below shows that when a coroutine does not call suspending functions, it cannot be suspended.

fun main() = runBlocking {
    val job = launch {
        val start = System.currentTimeMillis()
        for (i in 1..999999) {
            println("$i")
        }
        val spent = System.currentTimeMillis() - start
        println("Coroutine spent $spent")
    }

    delay(1)
    job.cancel()

    println("Complete")
}

The above code will output the following.

Hello 1
...
Hello 999999
Coroutine spent 9265
Complete

Lazy Coroutines

When using coroutine builder, such as launch(), to create a coroutine, the coroutine is preset to start immediately. We can also start this coroutine later, which are called lazy coroutines.

Its usage is very simple, that is to set parameter start to CoroutineStart.LAZY.

fun main() = runBlocking {
    val job = launch(start = CoroutineStart.LAZY) {
        println("Hello World")
    }

    println("Start coroutine")
    job.start()

    println("Complete")
}

The above code will output the following.

Start coroutine
Complete
Hello World

Coroutine Context

Each coroutine has a context whose type is CoroutineContext, which contains some elements.

public interface CoroutineContext {
    /**
     * Returns the element with the given [key] from this context or `null`.
     */
    public operator fun <E : Element> get(key: Key<E>): E?

    /**
     * Accumulates entries of this context starting with [initial] value and applying [operation]
     * from left to right to current accumulator value and each element of this context.
     */
    public fun <R> fold(initial: R, operation: (R, Element) -> R): R

    /**
     * Returns a context containing elements from this context and elements from  other [context].
     * The elements from this context with the same key as in the other one are dropped.
     */
    public operator fun plus(context: CoroutineContext): CoroutineContext

    /**
     * Returns a context containing elements from this context, but without an element with
     * the specified [key].
     */
    public fun minusKey(key: Key<*>): CoroutineContext

    /**
     * Key for the elements of [CoroutineContext]. [E] is a type of element with this key.
     */
    public interface Key<E : Element>
}

In addition, operator func plus is a very important function. It can merge two contexts. In the code below, launch(this.coroutineContext + Dispatchers.Default) is equivalent to launch(Dispatchers.Default).

fun main() = runBlocking {
    launch(this.coroutineContext + Dispatchers.Default) {
        println("child job: ${Thread.currentThread().name}")
    }
    println("parent job: ${Thread.currentThread().name}")
}

Coroutine context actually is contained in a scope whose type is CoroutineScope.

public interface CoroutineScope {
    /**
     * The context of this scope.
     * Context is encapsulated by the scope and used for implementation of coroutine builders that are extensions on the scope.
     * Accessing this property in general code is not recommended for any purposes except accessing the [Job] instance for advanced usages.
     *
     * By convention, should contain an instance of a [job][Job] to enforce structured concurrency.
     */
    public val coroutineContext: CoroutineContext
}

Each coroutine builder (such as launch()) is an extension of CoroutineScope. These coroutine builders will inherit the context from the current scope, and override the inherited context with the context passed in (use operator fun plus to merge), as follows. In fact, the first parameter of launch() is preset to EmptyCoroutineContext. In addition, in the blocks of these coroutine builders, this is the coroutine scope of the current coroutine.

fun main() = runBlocking {
    launch(CoroutineName("child")) {
        println("Hello World")
    }
}

This new context will be the parent context of the child coroutine. The parent context will be used to create the child coroutine. Then, the parent context plus the job of the child coroutine (that is, the child job) will become the context of the child coroutine (that is, the child context). The details of this part are explained in detail in Coroutine Context and Scope.

Jobs

A coroutine is represented by a Job object, and Job is also an element in CoroutineContext. Job is responsible for coroutine’s lifecycle, cancellation, and parent-child relationship.

The following code shows how to access the job in the scope. In the code Job is the key of the job element.

fun main() = runBlocking {
    println("Current job is ${coroutineContext[Job]}")
}

The code above will output the following.

Current job is "coroutine#1":BlockingCoroutine{Active}@13b6d03

Dispatchers

Dispatcher is also an important element in context. It determines what thread is used to execute its associated coroutine. Dispatchers object predefines the following dispatchers.

  • Dispatchers.Default: The default dispatcher.
  • Dispatchers.IO: Designed to execute IO tasks.
  • Dispatchers.Main: Executing coroutines on the main thread.
  • Dispatchers.Unconfined: Executing a coroutine on the current thread. When later it is resumed, the coroutine will continue to execute directly on the thread that called resume.

Both Default and IO dispatcher have a thread pool, so the dispatcher can execute multiple coroutines at the same time.

If no dispatcher is specified, most coroutine builders will inherit the current dispatcher. Before, it was mentioned that coroutine builders will inherit the current context, and the context also includes a dispatcher. The following code will output the thread where each coroutine is located.

fun main() = runBlocking {
    launch {
        println("parent job: ${Thread.currentThread().name}")

        launch {
            println("child job: ${Thread.currentThread().name}")
        }
    }
    println("runBlocking: ${Thread.currentThread().name}")
}

The above code will output the following.

runBlocking: main @coroutine#1
parent job: main @coroutine#2
child job: main @coroutine#3

runBlocking() directly takes the current thread, which is the main thread that is Dispatchers.Main. After that, launch() inherit the current context, so Dispatchers.Main will also be inherited.

In the following code, we specify the dispatcher.

fun main() = runBlocking {
   launch(Dispatchers.Default) {
        println("parent job: ${Thread.currentThread().name}")

        launch {
            println("child job: ${Thread.currentThread().name}")
        }
    }
    println("runBlocking: ${Thread.currentThread().name}")
}

The output below shows that the first launch() runs on the specified Dispatchers.Default, and the second launch() inherits the dispatcher of the first launch().

runBlocking: main @coroutine#1
parent job: DefaultDispatcher-worker-1 @coroutine#2
child job: DefaultDispatcher-worker-2 @coroutine#3

Coroutine Builders

launch()

fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job

launch() has been introduced at the beginning of this article. launch() creates a coroutine, then returns immediately and continues execution. Moreover, this new coroutine will execute the block asynchronously.

fun main() = runBlocking {
    val job = launch {
        println("Hello Coroutine!")
    }

    job.join()
    println("Complete")
}

The code above will output the following.

Hello Coroutine!
Complete

async()

fun <T> CoroutineScope.async(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> T
): Deferred<T>

Sometimes you want to return a value after a coroutine is executed. async() and launch() create a new coroutine, but async() will return a value, as shown in the following code.

fun main() = runBlocking {
    val job = async {
        println("Hello Coroutine!")
        "Complete"
    }

    val result = job.await()
    println(result)
}

The code above will output the following.

Hello Coroutine!
Complete

async() returns a Deferred<T> object, and Deferred<T> inherits Job. Therefore, Deferred<T> refers to a deferred job. When a coroutine created by async() finishes executing, it will return the last statement in the block, in the example that is Complete.

Similar to Job.join(), we can call Deferred<T>.await() to wait for the completion of a coroutine. The difference is that it will return the return value in the block.

Scoping Functions

withContext()

suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T

withContext() creates a new scope and suspends the execution of the current block. If a dispatcher different from the current scope is passed in the parameter, it will execute its block on a different thread. After the execution is complete, it returns to the originally suspended block, and it can also return a value. Therefore, the main purpose of withContext() is to allow the current coroutine to temporarily switch context. In addition, because it will suspend the execution of the current block, it is a suspending function.

Let’s look at the following example.

fun main() = runBlocking {
    val result = withContext(Dispatchers.Default) {
        println("job: ${Thread.currentThread().name}")
        "Complete"
    }
    println("runBlocking: ${Thread.currentThread().name}")
    println(result)
}

The code above will output the following.

job: DefaultDispatcher-worker-1 @coroutine#1
runBlocking: main @coroutine#1
Complete

coroutineScope()

suspend fun <R> coroutineScope(
    block: suspend CoroutineScope.() -> R
): R

coroutineScope() creates a new scope and suspends the execution of the current block. When its block execution is completed, it returns to the originally suspended block. It looks very similar to withContext(). Like withContext(), it will return a value, but there is no way to assign a new context to coroutineScope(). In addition, because it will suspend the execution of the current block, it is a suspending function.

It seems that in many cases, coroutineScope() and withContext() can replace each other. This is not wrong. However, their purpose is completely different. As mentioned earlier, withContext() is designed to temporarily switch to a different context to execute a block. However, coroutineScope() is designed to execute a block containing several parallel child coroutines. After all the child coroutines in the block are executed, it returns to the original block.

An example of coroutineScope() is as follows.

fun main() = runBlocking {
    coroutineScope {
        launch {
            println("job 1: ${Thread.currentThread().name}")
        }

        launch {
            println("job 2: ${Thread.currentThread().name}")
        }
    }
    println("main job: ${Thread.currentThread().name}")
}

The output is as follows.

job 1: main @coroutine#2
job 2: main @coroutine#3
main job: main @coroutine#1

suspendCancellableCoroutine() & suspendCoroutine()

inline suspend fun <T> suspendCancellableCoroutine(
    crossinline block: (CancellableContinuation<T>) -> Unit
): T

suspend inline fun <T> suspendCoroutine(
    crossinline block: (Continuation<T>) -> Unit
): T

Both suspendCancellableCoroutine() and suspendCoroutine() are used to convert callback-style code into suspending functions. The difference is that suspendCancellableCoroutine() can be cancelled, but suspendCoroutine() cannot.

The code below shows how to convert a callback-style code into a suspending function.

suspend fun fun1(): String = suspendCancellableCoroutine { continuation ->
    val callback = object : GetUserCallback {
        override fun onCompleted(value: String) {
            continuation.resume(value)
        }

        override fun onError(cause: Throwable) {
            continuation.resumeWithException(cause)
        }
    }
    getUserApi.register(callback)
    continuation.invokeOnCancellation { getUserApi.unregister(callback) }
    getUserApi.requestGet(url)
}

suspend fun fun2(): String = suspendCoroutine { continuation ->
    val callback = object : GetUserCallback {
        override fun onCompleted(value: String) {
            continuation.resume(value)
        }

        override fun onError(cause: Throwable) {
            continuation.resumeWithException(cause)
        }
    }
    getUserApi.register(callback)
    getUserApi.requestGet(url)
}

fun main() = runBlocking {
    val result1 = fun1()
    val result2 = fun2()
    println("$result1 & $result2")
}

We can see after suspendCancellableCoroutine() and suspendCoroutine() are executed, the coroutine is suspended. The coroutine will not be resumed until the API comes back and calls Continuation.resume().

Cancellation

Sometimes you have a long-running coroutine in background. When you no longer need it, you can call Job.cancel() to cancel it. In addition, we mentioned earlier that Job has a parent-child relationship. When you cancel a job, all its child jobs will also be cancelled.

The following example shows that when cancelling a Job, all children, no matter it is created by launch(), async(), coroutineScope(), or withContext(), will be cancelled.

fun main() = runBlocking {
    val job = launch {
        coroutineScope {
            launch {
                repeat(10) { i ->
                    println("launch child: $i")
                    delay(500)
                }
            }

            async {
                repeat(10) { i ->
                    println("async child: $i")
                    delay(500)
                }
            }

            repeat(10) { i ->
                println("coroutineScope: $i")
                delay(500)
            }
        }
    }

    delay(1200)
    job.cancel()
    println("Complete")
}

The output is as follows.

coroutineScope: 0
launch child: 0
async child: 0
coroutineScope: 1
launch child: 1
async child: 1
coroutineScope: 2
launch child: 2
async child: 2
Complete

Exceptions

Exceptions Propagation

When a coroutine throws an exception, its child coroutines will be cancelled, and the exception will be thrown out to the parent coroutines layer by layer until root coroutine, as shown in the following code.

fun main() = runBlocking { // root coroutine
    val job1 = async {
        val job2 = launch {
            throw Exception("From job2")
        }

        job2.join()
        println("job1 is done")
    }

    job1.await()
    println("runBlocking is done")
}

The output is as follows.

Exception in thread "main" java.lang.Exception: From job2

We can see that all coroutines from job2 coroutine to root coroutine (runBlocking) have been cancelled.

Next, let’s catch the exception in job1 and see what happens.

fun main() = runBlocking { // root coroutine
    val job1 = async {
        val job2 = launch {
            throw Exception("From job2")
        }

        try {
            job2.join()
        } catch (e: Exception) {
            println("Caught an exception: $e")
        }
        println("job1 is done")
    }

    job1.await()
    println("runBlocking is done")
}

The output is as follows.

Caught an exception: java.lang.Exception: From job2
job1 is done
Exception in thread "main" java.lang.Exception: From job2

We found that the exception is propagated to all parent coroutines until its root coroutine, even if job1 catches the exception on the way, it will continue to propagate upward. However, job1 has not been cancelled and can continue to execute.

Exposing Exceptions to Callers

In the next example, we add a root coroutine to see what changes will happen.

fun main() = runBlocking { // root coroutine
    val rootJob = GlobalScope.launch { // root coroutine
        val job1 = async {
            val job2 = launch {
                throw Exception("From job2")
            }

            job2.join()
            println("job1 is done")
        }

        job1.await()
        println("rootJob is done")
    }

    rootJob.join()
    println("runBlocking is done")
}

The output is as follows.

runBlocking is done
Exception in thread "DefaultDispatcher-worker-1 @coroutine#2" java.lang.Exception: From job2

You can see that the exception will cancel the entire parents and child coroutines until the root coroutine. So, after rootJob is cancellation, runBlocking() can still continue.

Look at the code below, we use async() to create root coroutine instead, to see what would be different.

fun main() = runBlocking {
    val rootJob = GlobalScope.async {
        val job1 = async {
            val job2 = launch {
                throw Exception("From job2")
            }

            job2.join()
            println("job1 is done")
        }

        job1.await()
        println("rootJob is done")
    }

    rootJob.await()
    println("runBlocking is done")
}

Its output is below.

Exception in thread "main" java.lang.Exception: From job2

We can see that the root coroutine created by launch() and async() have different behaviors. When a root coroutine created by launch() throws an exception, it can continue to execute. However, when a root coroutine created by async() throws an exception, it will throw the exception and let the caller handle it, as follows.

fun main() = runBlocking {
    val rootJob = GlobalScope.async {
        val job1 = async {
            val job2 = launch {
                throw Exception("From job2")
            }

            job2.join()
            println("job1 is done")
        }

        job1.await()
        println("rootJob is done")
    }

    try {
        rootJob.await()
    } catch (e: Exception) {
        println("Caught an exception: $e")
    }
    println("runBlocking is done")
}

The output is as follows.

Caught an exception: java.lang.Exception: From job2
runBlocking is done

Handling Uncaught Exceptions

We just discussed that root coroutines expose exceptions to callers. If callers does not catch these exceptions, these exceptions are called uncaught exceptions. Kotlin’s CoroutineExceptionHandler allows us to customize how to output these uncaught exceptions to the console. CoroutineExceptionHandler must be passed in when the root coroutine is created.

fun main() = runBlocking {
    val handler = CoroutineExceptionHandler { _, exception ->
        println("CoroutineExceptionHandler got $exception")
    }
    val job1 = GlobalScope.launch(handler) {
        val job2 = async {
            throw Exception("from job2")
        }

        job2.await()
        println("job1 is done")
    }

    job1.join()
    println("runBlocking is done")
}

The output is as follows.

CoroutineExceptionHandler got java.lang.Exception: from job2
runBlocking is done

It is worth noting that in CoroutineExceptionHandler, you cannot recover from exceptions.

Timeout

suspend fun <T> withTimeout(
    timeMillis: Long,
    block: suspend CoroutineScope.() -> T
): T

suspend fun <T> withTimeoutOrNull(
    timeMillis: Long,
    block: suspend CoroutineScope.() -> T
): T?

withTimeout() allows to execute a block with a timeout, and return a value. If timeout, TimeoutCancellationException will be thrown.

fun main() = runBlocking {
    try {
        val result = withTimeout(1000) {
            repeat(10) { i ->
                println("withTimeout $i")
                delay(500)
            }
            "Complete"
        }
        println(result)
    } catch (e: TimeoutCancellationException) {
        println(e)
    }
}

The output of the above code is as follows.

withTimeout 0
withTimeout 1
kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1000 ms

withTimeoutOrNull() is the same as withTimeout(), but when timeout, it will not throw a TimeoutCancellationException, but return null. The example is as follows.

fun main() = runBlocking {
    val result = withTimeoutOrNull(1000) {
        repeat(10) { i ->
            println("withTimeout $i")
            delay(500)
        }
        "Complete"
    }

    println("Result is $result")
}

The output of the above code is as follows.

withTimeout 0
withTimeout 1
Result is null

Conclusion

Kotlin coroutines are lighter and more efficient than threads. When a coroutine is suspended, the thread will not be blocked. In addition, suspending functions allow us to write asynchronous code sequentially like synchronous code, which makes the code easier to read. It is not easy to get started to learn coroutine, but once you get familiar with it, you will never go back to threads.

2 comments
Leave a Reply

Your email address will not be published. Required fields are marked *

You May Also Like