Swift Combine Tutorial

Photo by Chris Murray on Unsplash
Photo by Chris Murray on Unsplash
Swift Combine is a library created by Apple implementing reactive programming. Before Combine came out, we generally used RxSwift.

Swift Combine is a library created by Apple implementing reactive programming. Before Combine came out, we generally used RxSwift. However, using Combine, we don’t need to introduce additional libraries. And compared to RxSwift, Combine has better performance.

Publisher & Subscriber

In Swift Combine, there are two very important concepts, publishers and subscribers. Publishers are used to send data, and subscribers are used to subscribe to a publisher. That is, a subscriber is used to receive data from a publisher. This creates a stream.

protocol Publisher<Output, Failure>

protocol Subscriber<Input, Failure> : CustomCombineIdentifierConvertible

In the following example, we create a TimerPublisher. It sends the current time every second. Then, we call sink(receiveValue:) to receive the data sent by TimerPublisher. sink(receiveValue:) will create a subscriber to subscribe to the current publisher.

import UIKit
import Combine

class ViewController: UIViewController {
    @IBOutlet weak var timeLabel: UILabel!
    private var cancellable: AnyCancellable?

    override func viewDidLoad() {
        super.viewDidLoad()

        cancellable = Timer.publish(every: 1, on: RunLoop.main, in: .common)
            .autoconnect()
            .sink { date in
                timeLabel.text = date.description
            }
    }
}

If we want to directly assign the received data to a certain variable, then we can use assign(to:on:) to directly assign the received data to a certain variable, as shown in the following example.

import UIKit
import Combine

class ViewController: UIViewController {
    @IBOutlet weak var timeLabel: UILabel!
    private var cancellable: AnyCancellable?

    override func viewDidLoad() {
        super.viewDidLoad()

        cancellable = Timer.publish(every: 1, on: RunLoop.main, in: .common)
            .autoconnect()
            .map { $0.description }
            .assign(to: \.text, on: timeLabel)
    }
}

@Published

@Published can turn a property into a publisher. As soon as you assign a value to it, it will ㄖpublish the value. When accessing the property’s publisher, add $ operator in front.

import UIKit
import Combine

class ViewController: UIViewController {
    @IBOutlet weak var timeLabel: UILabel!
    
    @IBAction func clickButton(_ sender: Any) {
        text = Date().description
    }
    
    private var cancellable: AnyCancellable?
    @Published private var text: String? = ""

    override func viewDidLoad() {
        super.viewDidLoad()
        cancellable = $text.assign(to: \.text, on: timeLabel)
    }
}

Publishers

Future

The Future publisher eventually produces a value, and finishes. It is used to wrap asynchronous code into a publisher.

func login() -> Future<String, Never> {
    Future { promise in
        DispatchQueue.global().async {
            // do login ...
            let token = result.token
            promise(Result.success(token))
        }
    }
}

let cancellable = login()
    .sink { token in
        print(token)
    }

Just

Just publisher produces a value and finishes. It is often used to start a chain of publishers.

let cancellable = Just(1)
    .sink(receiveCompletion: { completion in
        print(completion)
    }, receiveValue: { value in
        print(value)
    })

// Output:
// 1
// finished

Record

Record publisher is similar to Just, but Record can produce a series of values.

let cancellable = Record(output: [1, 2, 3, 4], completion: .finished)
    .sink(receiveValue: { value in
        print(value)
    })

// Output:
// 1
// 2
// 3
// 4

Deferred

Deferred publisher uses the closure to create a publisher, but it will not create the publisher until it is subscribed.

func login() -> Deferred<Future<String, Never>> {
    Deferred {
        Future { promise in
            DispatchQueue.global().async {
                // do login ...
                let token = result.token
                promise(Result.success(token))
            }
        }
    }
}

Subjects

Subject is a Publisher, and it also provides Subject.send() to let the callers send data.

PassthroughSubject

PassthroughSubject can broadcast data to all subscribers. Moreover, subscribers will only receive the data after they subscribe.

let subject = PassthroughSubject<String, Error>()

subject.send("1")
subject.send("2")
        
let cancellable = subject
    .sink(receiveCompletion: { completion in
        print(completion)
    }, receiveValue: { value in
        print(value)
    })
        
subject.send("3")
subject.send("4")
subject.send(completion: .finished)

// Output:
// 3
// 4
// finished

CurrentValueSubject

Like PassthroughSubject, CurrentValueSubject can broadcast data to all subscribers. However, CurrentValueSubject maintains a buffer to store the last value. Therefore, when initializing, a default value must be passed in to the buffer. After Subscribers subscribe, they will only receive the value in the buffer and the data sent later.

let subject = CurrentValueSubject<String, Error>("0")

subject.send("1")
subject.send("2")
        
let cancellable = subject
    .sink(receiveCompletion: { _ in
        print("complete")
    }, receiveValue: { value in
        print(value)
    })
        
subject.send("3")
subject.send("4")
subject.send(completion: .finished)

// Output:
// 2
// 3
// 4
// complete

Specifying Scheduler

If we do not specify a scheduler, Combine uses the same thread as when sending data to receive data. That is to say, if you send data in the background thread, the closure that receives the data in sink() will be called in the background thread.

// Main thread
let subject = PassthroughSubject<Int, Never>()
let cancellable = subject.sink(receiveValue: { value in
    print("\(value) sent from \(Thread.current)")
})
subject.send(1)
DispatchQueue.global().async {
    subject.send(2)
}

// Output:
// 1 sent from <_NSMainThread: 0x6000011043c0>{number = 1, name = main}
// 2 sent from <NSThread: 0x600001134000>{number = 4, name = (null)}

receive(on:)

When you use receive(on:) to specify a scheduler, all operators after it will receive data in that scheduler.

// Main thread
let subject = PassthroughSubject<Int, Never>()
let cancellable = subject
    .map {
        print("map1: \($0) sent from \(Thread.current)")
        return $0
    }
    .receive(on: DispatchQueue.global())
    .map {
        print("map2: \($0) sent from \(Thread.current)")
        return $0
    }
    .sink(receiveValue: { value in
        print("sink: \(value) sent from \(Thread.current)")
    })
subject.send(1)

// Output:
// map1: 1 sent from <_NSMainThread: 0x600003d343c0>{number = 1, name = main}
// map2: 1 sent from <NSThread: 0x600003d14240>{number = 5, name = (null)}
// sink: 1 sent from <NSThread: 0x600003d14240>{number = 5, name = (null)}

subscribe(on:)

When you use subscribe(on:options:) to specify a scheduler, all operators will receive data in that scheduler until a receive(on:) specifies other schedulers.

// Main thread
let subject = PassthroughSubject<Int, Never>()
let cancellable = subject
    .subscribe(on: DispatchQueue.global())
    .map {
        print("map1: \($0) sent from \(Thread.current)")
        return $0
    }
    .receive(on: RunLoop.main)
    .map {
        print("map2: \($0) sent from \(Thread.current)")
        return $0
    }
    .sink(receiveValue: { value in
        print("sink: \(value) sent from \(Thread.current)")
    })
subject.send(1)

// Output:
// map1: 1 sent from <NSThread: 0x600000a19100>{number = 4, name = (null)}
// map2: 1 sent from <_NSMainThread: 0x600000a18180>{number = 1, name = main}
// sink: 1 sent from <_NSMainThread: 0x600000a18180>{number = 1, name = main}

Operators

The following will introduce some commonly used operators. Publishers has many other operators, please refer to the official website.

map() & tryMap()

func map<T>(_ transform: @escaping (Self.Output) -> T) -> Publishers.Map<Self, T>

func tryMap<T>(_ transform: @escaping (Self.Output) throws -> T) -> Publishers.TryMap<Self, T>

map() and tryMap() use transform to transform each value from the upstream publisher, and return a new value to the downstream publisher.

let cancellable = Record(output: [1, 2, 3, 4], completion: .finished)
    .map {
        "Hello \($0)"
    }
    .sink(receiveValue: { value in
        print(value)
    })

// Output:
// Hello 1
// Hello 2
// Hello 3
// Hello 4

filter() & tryFilter()

func filter(_ isIncluded: @escaping (Self.Output) -> Bool) -> Publishers.Filter<Self>

func tryFilter(_ isIncluded: @escaping (Self.Output) throws -> Bool) -> Publishers.TryFilter<Self>

filter() and tryFilter() filter the values ​​from upstream. Only when inIncluded returns true will it be resent to downstream.

let cancellable = Record(output: [1, 2, 3, 4], completion: .finished)
    .filter {
        $0 > 2
    }
    .sink(receiveValue: { value in
        print(value)
    })

// Output:
// 3
// 4

compactMap() & tryCompactMap()

func compactMap<T>(_ transform: @escaping (Self.Output) -> T?) -> Publishers.CompactMap<Self, T>

https://developer.apple.com/documentation/combine/publisher/trycompactmap(_:)

compactMap() and tryCompactMap() receive each value from upstream and return an optional value. Only when the returned optional value is not nil will it be sent downstream.

let cancellable = Record(output: [1, 2, nil, 4], completion: .finished)
    .compactMap { 
        $0
    }
    .sink(receiveValue: { value in
        print(value)
    })

// Output:
// 1
// 2
// 4

reduce() & tryReduce()

func reduce<T>(
    _ initialResult: T,
    _ nextPartialResult: @escaping (T, Self.Output) -> T
) -> Publishers.Reduce<Self, T>

func tryReduce<T>(
    _ initialResult: T,
    _ nextPartialResult: @escaping (T, Self.Output) throws -> T
) -> Publishers.TryReduce<Self, T>

reduce() and tryReduce() take two parameters. The first parameter accumulator of the second parameter nextPartialResult will initially be the value of initialResult. Then, nextPartialResult will return a value. When the next value comes in from upstream, the accumulator will be the value returned by nextPartialResult last time.

The following example shows how to use reduce() to calculate the sum from 1 to 4.

let cancellable = Record(output: [1, 2, 3, 4], completion: .finished)
    .reduce(0) { accumulator, value in
        print("acc=\(accumulator), value=\(value)")
        return accumulator + value
    }
    .sink(receiveValue: { value in
        print(value)
    })

// Output
// acc=0, value=1
// acc=1, value=2
// acc=3, value=3
// acc=6, value=4
// 10

merge()

func merge(with other: Self) -> Publishers.MergeMany<Self>

merge() can merge another publisher of the same type. Values ​​will be sent to downstream in the order of time they were sent.

let subject1 = PassthroughSubject<Int, Never>()
let subject2 = PassthroughSubject<Int, Never>()
let cancellable = subject1
    .merge(with: subject2)
    .sink(receiveValue: { value in
        print(value)
    })
subject1.send(1)
subject2.send(11)
subject1.send(2)
subject1.send(3)
subject1.send(12)
subject1.send(4)

// Output:
// 1
// 11
// 2
// 3
// 12
// 4

zip()

func zip<P>(_ other: P) -> Publishers.Zip<Self, P> where P : Publisher, Self.Failure == P.Failure

zip() combines the values ​​sent by another publisher, and send a pair of values ​​to downstream in tuple.

let _ = Record(output: [1, 2, 3, 4], completion: .finished)
    .zip(Record(output: [5, 6, 7, 8], completion: .finished))
    .sink(receiveValue: { value in
        print(value)
    })

// Output:
// (1, 5)
// (2, 6)
// (3, 7)
// (4, 8)

flatMap()

func flatMap<T, P>(
    maxPublishers: Subscribers.Demand = .unlimited,
    _ transform: @escaping (Self.Output) -> P
) -> Publishers.FlatMap<P, Self> where T == P.Output, P : Publisher, Self.Failure == P.Failure

Like map(), flatMap() transforms each value of the upstream publisher with transform, but it returns a new publisher.

let _ = Just([1, 2, 3, 4])
    .flatMap { array in
        Just(array.map { $0 * 10 })
    }
    .sink(receiveValue: { value in
        print(value)
    })

// Output:
// [10, 20, 30, 40]

Conclusion

Swift Combine allows us to handle asynchronous code more gracefully. However, after Swift async/await is released, we may prefer to use async/await to implement asynchronous code instead of using Combine. However, we still use Combine to implement the Observer pattern. This is heavily used in SwiftUI.

Reference

Leave a Reply

Your email address will not be published. Required fields are marked *

You May Also Like