Swift Combine is a library created by Apple implementing reactive programming. Before Combine came out, we generally used RxSwift. However, using Combine, we don’t need to introduce additional libraries. And compared to RxSwift, Combine has better performance.
Table of Contents
Publisher & Subscriber
In Swift Combine, there are two very important concepts, publishers and subscribers. Publishers are used to send data, and subscribers are used to subscribe to a publisher. That is, a subscriber is used to receive data from a publisher. This creates a stream.
protocol Publisher<Output, Failure> protocol Subscriber<Input, Failure> : CustomCombineIdentifierConvertible
In the following example, we create a TimerPublisher. It sends the current time every second. Then, we call sink(receiveValue:) to receive the data sent by TimerPublisher. sink(receiveValue:) will create a subscriber to subscribe to the current publisher.
import UIKit import Combine class ViewController: UIViewController { @IBOutlet weak var timeLabel: UILabel! private var cancellable: AnyCancellable? override func viewDidLoad() { super.viewDidLoad() cancellable = Timer.publish(every: 1, on: RunLoop.main, in: .common) .autoconnect() .sink { date in timeLabel.text = date.description } } }
If we want to directly assign the received data to a certain variable, then we can use assign(to:on:) to directly assign the received data to a certain variable, as shown in the following example.
import UIKit import Combine class ViewController: UIViewController { @IBOutlet weak var timeLabel: UILabel! private var cancellable: AnyCancellable? override func viewDidLoad() { super.viewDidLoad() cancellable = Timer.publish(every: 1, on: RunLoop.main, in: .common) .autoconnect() .map { $0.description } .assign(to: \.text, on: timeLabel) } }
@Published
@Published can turn a property into a publisher. As soon as you assign a value to it, it will ㄖpublish the value. When accessing the property’s publisher, add $
operator in front.
import UIKit import Combine class ViewController: UIViewController { @IBOutlet weak var timeLabel: UILabel! @IBAction func clickButton(_ sender: Any) { text = Date().description } private var cancellable: AnyCancellable? @Published private var text: String? = "" override func viewDidLoad() { super.viewDidLoad() cancellable = $text.assign(to: \.text, on: timeLabel) } }
Publishers
Future
The Future publisher eventually produces a value, and finishes. It is used to wrap asynchronous code into a publisher.
func login() -> Future<String, Never> { Future { promise in DispatchQueue.global().async { // do login ... let token = result.token promise(Result.success(token)) } } } let cancellable = login() .sink { token in print(token) }
Just
Just publisher produces a value and finishes. It is often used to start a chain of publishers.
let cancellable = Just(1) .sink(receiveCompletion: { completion in print(completion) }, receiveValue: { value in print(value) }) // Output: // 1 // finished
Record
Record publisher is similar to Just, but Record can produce a series of values.
let cancellable = Record(output: [1, 2, 3, 4], completion: .finished) .sink(receiveValue: { value in print(value) }) // Output: // 1 // 2 // 3 // 4
Deferred
Deferred publisher uses the closure to create a publisher, but it will not create the publisher until it is subscribed.
func login() -> Deferred<Future<String, Never>> { Deferred { Future { promise in DispatchQueue.global().async { // do login ... let token = result.token promise(Result.success(token)) } } } }
Subjects
Subject is a Publisher, and it also provides Subject.send() to let the callers send data.
PassthroughSubject
PassthroughSubject can broadcast data to all subscribers. Moreover, subscribers will only receive the data after they subscribe.
let subject = PassthroughSubject<String, Error>() subject.send("1") subject.send("2") let cancellable = subject .sink(receiveCompletion: { completion in print(completion) }, receiveValue: { value in print(value) }) subject.send("3") subject.send("4") subject.send(completion: .finished) // Output: // 3 // 4 // finished
CurrentValueSubject
Like PassthroughSubject, CurrentValueSubject can broadcast data to all subscribers. However, CurrentValueSubject maintains a buffer to store the last value. Therefore, when initializing, a default value must be passed in to the buffer. After Subscribers subscribe, they will only receive the value in the buffer and the data sent later.
let subject = CurrentValueSubject<String, Error>("0") subject.send("1") subject.send("2") let cancellable = subject .sink(receiveCompletion: { _ in print("complete") }, receiveValue: { value in print(value) }) subject.send("3") subject.send("4") subject.send(completion: .finished) // Output: // 2 // 3 // 4 // complete
Specifying Scheduler
If we do not specify a scheduler, Combine uses the same thread as when sending data to receive data. That is to say, if you send data in the background thread, the closure that receives the data in sink() will be called in the background thread.
// Main thread let subject = PassthroughSubject<Int, Never>() let cancellable = subject.sink(receiveValue: { value in print("\(value) sent from \(Thread.current)") }) subject.send(1) DispatchQueue.global().async { subject.send(2) } // Output: // 1 sent from <_NSMainThread: 0x6000011043c0>{number = 1, name = main} // 2 sent from <NSThread: 0x600001134000>{number = 4, name = (null)}
receive(on:)
When you use receive(on:) to specify a scheduler, all operators after it will receive data in that scheduler.
// Main thread let subject = PassthroughSubject<Int, Never>() let cancellable = subject .map { print("map1: \($0) sent from \(Thread.current)") return $0 } .receive(on: DispatchQueue.global()) .map { print("map2: \($0) sent from \(Thread.current)") return $0 } .sink(receiveValue: { value in print("sink: \(value) sent from \(Thread.current)") }) subject.send(1) // Output: // map1: 1 sent from <_NSMainThread: 0x600003d343c0>{number = 1, name = main} // map2: 1 sent from <NSThread: 0x600003d14240>{number = 5, name = (null)} // sink: 1 sent from <NSThread: 0x600003d14240>{number = 5, name = (null)}
subscribe(on:)
When you use subscribe(on:options:) to specify a scheduler, all operators will receive data in that scheduler until a receive(on:) specifies other schedulers.
// Main thread let subject = PassthroughSubject<Int, Never>() let cancellable = subject .subscribe(on: DispatchQueue.global()) .map { print("map1: \($0) sent from \(Thread.current)") return $0 } .receive(on: RunLoop.main) .map { print("map2: \($0) sent from \(Thread.current)") return $0 } .sink(receiveValue: { value in print("sink: \(value) sent from \(Thread.current)") }) subject.send(1) // Output: // map1: 1 sent from <NSThread: 0x600000a19100>{number = 4, name = (null)} // map2: 1 sent from <_NSMainThread: 0x600000a18180>{number = 1, name = main} // sink: 1 sent from <_NSMainThread: 0x600000a18180>{number = 1, name = main}
Operators
The following will introduce some commonly used operators. Publishers has many other operators, please refer to the official website.
map() & tryMap()
func map<T>(_ transform: @escaping (Self.Output) -> T) -> Publishers.Map<Self, T> func tryMap<T>(_ transform: @escaping (Self.Output) throws -> T) -> Publishers.TryMap<Self, T>
map() and tryMap() use transform
to transform each value from the upstream publisher, and return a new value to the downstream publisher.
let cancellable = Record(output: [1, 2, 3, 4], completion: .finished) .map { "Hello \($0)" } .sink(receiveValue: { value in print(value) }) // Output: // Hello 1 // Hello 2 // Hello 3 // Hello 4
filter() & tryFilter()
func filter(_ isIncluded: @escaping (Self.Output) -> Bool) -> Publishers.Filter<Self> func tryFilter(_ isIncluded: @escaping (Self.Output) throws -> Bool) -> Publishers.TryFilter<Self>
filter() and tryFilter() filter the values from upstream. Only when inIncluded
returns true will it be resent to downstream.
let cancellable = Record(output: [1, 2, 3, 4], completion: .finished) .filter { $0 > 2 } .sink(receiveValue: { value in print(value) }) // Output: // 3 // 4
compactMap() & tryCompactMap()
func compactMap<T>(_ transform: @escaping (Self.Output) -> T?) -> Publishers.CompactMap<Self, T> https://developer.apple.com/documentation/combine/publisher/trycompactmap(_:)
compactMap() and tryCompactMap() receive each value from upstream and return an optional value. Only when the returned optional value is not nil will it be sent downstream.
let cancellable = Record(output: [1, 2, nil, 4], completion: .finished) .compactMap { $0 } .sink(receiveValue: { value in print(value) }) // Output: // 1 // 2 // 4
reduce() & tryReduce()
func reduce<T>( _ initialResult: T, _ nextPartialResult: @escaping (T, Self.Output) -> T ) -> Publishers.Reduce<Self, T> func tryReduce<T>( _ initialResult: T, _ nextPartialResult: @escaping (T, Self.Output) throws -> T ) -> Publishers.TryReduce<Self, T>
reduce() and tryReduce() take two parameters. The first parameter accumulator
of the second parameter nextPartialResult
will initially be the value of initialResult
. Then, nextPartialResult
will return a value. When the next value comes in from upstream, the accumulator
will be the value returned by nextPartialResult
last time.
The following example shows how to use reduce() to calculate the sum from 1 to 4.
let cancellable = Record(output: [1, 2, 3, 4], completion: .finished) .reduce(0) { accumulator, value in print("acc=\(accumulator), value=\(value)") return accumulator + value } .sink(receiveValue: { value in print(value) }) // Output // acc=0, value=1 // acc=1, value=2 // acc=3, value=3 // acc=6, value=4 // 10
merge()
func merge(with other: Self) -> Publishers.MergeMany<Self>
merge() can merge another publisher of the same type. Values will be sent to downstream in the order of time they were sent.
let subject1 = PassthroughSubject<Int, Never>() let subject2 = PassthroughSubject<Int, Never>() let cancellable = subject1 .merge(with: subject2) .sink(receiveValue: { value in print(value) }) subject1.send(1) subject2.send(11) subject1.send(2) subject1.send(3) subject1.send(12) subject1.send(4) // Output: // 1 // 11 // 2 // 3 // 12 // 4
zip()
func zip<P>(_ other: P) -> Publishers.Zip<Self, P> where P : Publisher, Self.Failure == P.Failure
zip() combines the values sent by another publisher, and send a pair of values to downstream in tuple.
let _ = Record(output: [1, 2, 3, 4], completion: .finished) .zip(Record(output: [5, 6, 7, 8], completion: .finished)) .sink(receiveValue: { value in print(value) }) // Output: // (1, 5) // (2, 6) // (3, 7) // (4, 8)
flatMap()
func flatMap<T, P>( maxPublishers: Subscribers.Demand = .unlimited, _ transform: @escaping (Self.Output) -> P ) -> Publishers.FlatMap<P, Self> where T == P.Output, P : Publisher, Self.Failure == P.Failure
Like map(), flatMap() transforms each value of the upstream publisher with transform
, but it returns a new publisher.
let _ = Just([1, 2, 3, 4]) .flatMap { array in Just(array.map { $0 * 10 }) } .sink(receiveValue: { value in print(value) }) // Output: // [10, 20, 30, 40]
Conclusion
Swift Combine allows us to handle asynchronous code more gracefully. However, after Swift async/await is released, we may prefer to use async/await to implement asynchronous code instead of using Combine. However, we still use Combine to implement the Observer pattern. This is heavily used in SwiftUI.
Reference
- Combine, Apple Developer.
- Using Combine.
- RunLoop.main vs DispatchQueue.main: The differences explained, SwiftLee.