Collaboration between Combine and async/await

Published on

In Xcode 13.2, Apple has completed the back-deployment of async/await, lowering the minimum system requirement to iOS 13 (macOS Catalina). This move has encouraged more and more developers to try using async/await for development. After encountering AsyncSequence, many people have found that it behaves similarly to Combine, especially considering the lack of significant changes to the Combine framework in the past two years. This has raised questions about whether Apple intends to replace Combine with AsyncSequence and AsyncStream.

Coincidentally, I recently encountered a scenario in my development work that may require the use of both Combine and async/await. In this article, I will discuss the respective advantages of Combine and async/await, whether they can collaborate, and how to collaborate.

Issues to be resolved

Recently in development, I encountered a requirement:

  • In the app’s lifecycle, a series of events will occur periodically, with no fixed frequency or method of occurrence.
  • Handling each event requires a significant amount of system resources and requires calling the system’s async/await version of the API.
  • The app requires low latency for handling event results.
  • The system consumption of event processing needs to be limited to avoid processing multiple events simultaneously.
  • The use of GCD or OperationQueue is not considered.

With a little analysis of the above requirements, the direction to solve the problem can be quickly established:

  • Combine performs very well in observing and receiving events, making it the perfect candidate to solve the first requirement.
  • The async/await programming pattern will inevitably be used in the solution.

The remaining issues to be resolved are:

  • How to serialize event processing (one event must be processed before the next event can be processed).
  • How to combine Combine and async/await.

Comparison between Combine and AsyncSequence

Due to similarities between Combine and AsyncSequence, some developers may consider AsyncSequence as a replacement for Combine, for example:

  • Both allow handling of future values asynchronously
  • Both allow developers to use functions such as map and flatMap to manipulate values
  • Both end data streams when an error occurs

However, there are still significant differences between them.

Observing and Receiving Events

Combine is a tool designed for reactive programming. As its name suggests, it is very good at transforming and merging different event streams to generate new ones. Combine focuses on responding to changes. When a property changes, a user clicks a button, or a notification is sent through NotificationCenter, developers can use the built-in tools provided by Combine to respond in real time.

Through Subjects provided by Combine (PassthroughSubject, CurrentValueSubject), developers can easily inject values into the data stream. When your code is written in an imperative style, Subjects are particularly valuable.

In async/await, through AsyncSequence, we can observe and receive data from network streams, files, notifications, etc. However, compared to Combine, it still lacks the ability for data binding and data injection similar to Subjects.

In terms of observing and receiving events, Combine has a significant advantage.

Ability in Data Processing and Transformation

Looking solely at the number of methods used for data processing and transformation, AsyncSequence still has a considerable gap compared to Combine. However, AsyncSequence also provides some very practical methods and variables that Combine has not yet provided, such as characters, lines, and so on.

Due to their different focuses, even with the addition of more built-in methods over time, the two will not converge in terms of data processing and transformation. The greater possibility is that they will continue to expand in their respective areas of expertise.

Error Handling

In Combine, the type of the error value Failure is clearly defined, and in the data processing chain, in addition to requiring consistency in the type of output data value, it also requires consistency in the type of error value. To achieve this goal, Combine provides a large number of operations for handling error types, such as mapError, setFailureType, retry, and so on.

Using these methods to handle errors can provide the advantage of compiler-level guarantee, but on the other hand, for a complex logical data processing chain, the above error handling method will also significantly reduce the code’s readability and place high demands on developers’ mastery of error handling.

Async/await uses the familiar throw-catch method for error handling, which has almost no learning difficulty, and the code is more in line with most people’s reading habits.

There is not much difference in functionality between the two in error handling, mainly reflected in their different processing styles.

Management of Lifecycles

In Combine, developers can define the lifecycle of a data chain clearly through code, from subscription to unsubscription. However, when using AsyncSequence, the representation of the asynchronous sequence’s lifecycle is not as clear.

Scheduling and Organization

In Combine, developers can not only explicitly organize the behavior and location of asynchronous events by specifying a scheduler, but also have multiple ways to control the pipeline’s quantity, adjust processing frequency, and more.

AsyncSequence lacks the ability to control the processing location, frequency, concurrency, and other aspects of a data flow.

In the following text, we will attempt to address the needs raised in the previous section, and each solution will use a combination of Combine and async/await.

Solution 1

In Combine, there are two ways to limit the concurrency of data processing: one is to set the maxPublishers of flatMap, and the other is to customize the Subscriber. In this solution, we will use flatMap to serialize the processing of events.

To call asynchronous APIs in Combine, the official method is to wrap the upstream data into a Future Publisher and switch it through flatMap.

In Solution 1, we combine flatMap, Deferred (to ensure that the Future is only executed after subscription), and Future to create a new Operator to meet our needs.

Swift
public extension Publisher {
    func task<T>(maxPublishers: Subscribers.Demand = .unlimited,
                     _ transform: @escaping (Output) async -> T) -> Publishers.FlatMap<Deferred<Future<T, Never>>, Self> {
        flatMap(maxPublishers: maxPublishers) { value in
            Deferred {
                Future { promise in
                    Task {
                        let output = await transform(value)
                        promise(.success(output))
                    }
                }
            }
        }
    }
}

public extension Publisher where Self.Failure == Never {
    func emptySink() -> AnyCancellable {
        sink(receiveValue: { _ in })
    }
}

Due to the length, please visit Gist for the complete code version (supports Error and SetFailureType). The code for this solution was inspired by Sundell’s article.

Usage:

Swift
var cancellables = Set<AnyCancellable>()

func asyncPrint(value: String) async {
    print("hello \(value)")
    try? await Task.sleep(nanoseconds: 1000000000)
}

["abc","sdg","353"].publisher
    .task(maxPublishers:.max(1)){ value in
        await asyncPrint(value:value)
    }
    .emptySink()
    .store(in: &cancellables)
// Output
// hello abc
// Wait for 1 second
// hello sdg
// Wait for 1 second
// hello 353

If the ["abc", "sdg", "353"].publisher in the above code is replaced with PassthoughSubject or Notification, data loss may occur. This situation is due to the limitation of the number of parallel data processing, resulting in the consumption time of data exceeding the generation time of data. It is necessary to add a buffer after the Publisher to buffer the data.

Swift
let publisher = PassthroughSubject<String, Never>()
publisher
    .buffer(size: 10, prefetch: .keepFull, whenFull: .dropOldest) // The number of caches and strategies are determined according to the specific business situation
    .task(maxPublishers: .max(1)) { value in
        await asyncPrint(value:value)
    }
    .emptySink()
    .store(in: &cancellables)

publisher.send("fat")
publisher.send("bob")
publisher.send("man")

Solution 2

In plan two, we will use a custom Subscriber to limit the number of parallel processing and attempt to call async/await methods in the Subscriber.

Create a custom Subscriber:

Swift
extension Subscribers {
    public class OneByOneSink<Input, Failure: Error>: Subscriber, Cancellable {
        let receiveValue: (Input) -> Void
        let receiveCompletion: (Subscribers.Completion<Failure>) -> Void

        var subscription: Subscription?

        public init(receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void,
             receiveValue: @escaping (Input) -> Void) {
            self.receiveCompletion = receiveCompletion
            self.receiveValue = receiveValue
        }

        public func receive(subscription: Subscription) {
            self.subscription = subscription
            subscription.request(.max(1)) // Request the amount of data when subscribing
        }

        public func receive(_ input: Input) -> Subscribers.Demand {
            receiveValue(input)
            return .max(1) // Request the amount of data after data processing is completed
        }

        public func receive(completion: Subscribers.Completion<Failure>) {
            receiveCompletion(completion)
        }

        public func cancel() {
            subscription?.cancel()
            subscription = nil
        }
    }
}

In receive(subscription: Subscription), we use subscription.request(.max(1)) to set the amount of data requested by the subscriber when subscribing, and in receive(_ input: Input), we use return .max(1) to set the amount of data requested after each execution of the receiveValue method. In this way, we create a subscriber that applies for one value at a time and processes them one by one.

However, when we use Task to call async/await code in the receiveValue method, we will find that without providing a callback mechanism, the subscriber will ignore whether the asynchronous code is executed or not, and directly apply for the next value after calling, which is inconsistent with our needs.

There are multiple ways to implement callback mechanism in Subscriber, such as callback methods, Notification, @Published, etc. In the following code, we use Notification for callback notification.

Swift
public extension Subscribers {
    class OneByOneSink<Input, Failure: Error>: Subscriber, Cancellable {
        let receiveValue: (Input) -> Void
        let receiveCompletion: (Subscribers.Completion<Failure>) -> Void

        var subscription: Subscription?
        var cancellable: AnyCancellable?

        public init(notificationName: Notification.Name,
                    receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void,
                    receiveValue: @escaping (Input) -> Void) {
            self.receiveCompletion = receiveCompletion
            self.receiveValue = receiveValue
            cancellable = NotificationCenter.default.publisher(for: notificationName, object: nil)
                .sink(receiveValue: { [weak self] _ in self?.resume() })
                // After receiving the callback notification, continue to request new values from Publisher
        }

        public func receive(subscription: Subscription) {
            self.subscription = subscription
            subscription.request(.max(1))
        }

        public func receive(_ input: Input) -> Subscribers.Demand {
            receiveValue(input)
            return .none // Don't request new values after calling the function
        }

        public func receive(completion: Subscribers.Completion<Failure>) {
            receiveCompletion(completion)
        }

        public func cancel() {
            subscription?.cancel()
            subscription = nil
        }

        private func resume() {
            subscription?.request(.max(1))
        }
    }
}

public extension Publisher {
    func oneByOneSink(
        _ notificationName: Notification.Name,
        receiveCompletion: @escaping (Subscribers.Completion<Failure>) -> Void,
        receiveValue: @escaping (Output) -> Void
    ) -> Cancellable {
        let sink = Subscribers.OneByOneSink<Output, Failure>(
            notificationName: notificationName,
            receiveCompletion: receiveCompletion,
            receiveValue: receiveValue
        )
        self.subscribe(sink)
        return sink
    }
}

public extension Publisher where Failure == Never {
    func oneByOneSink(
        _ notificationName: Notification.Name,
        receiveValue: @escaping (Output) -> Void
    ) -> Cancellable where Failure == Never {
        let sink = Subscribers.OneByOneSink<Output, Failure>(
            notificationName: notificationName,
            receiveCompletion: { _ in },
            receiveValue: receiveValue
        )
        self.subscribe(sink)
        return sink
    }
}

Call:

Swift
let resumeNotification = Notification.Name("resume")

publisher
    .buffer(size: 10, prefetch: .keepFull, whenFull: .dropOldest)
    .oneByOneSink(
        resumeNotification,
        receiveValue: { value in
            Task {
                await asyncPrint(value: value)
                NotificationCenter.default.post(name: resumeNotification, object: nil)
            }
        }
    )
    .store(in: &cancellables)

For the requirements of this article, solution one is clearly more elegant than solution two, as it requires callbacks to complete the entire processing logic.

In solution two, the data processing chain can be paused, making it suitable for scenarios where a certain condition needs to be triggered before continuing execution.

Solution Three

As mentioned earlier, Apple has provided support for AsyncSequence for notifications. If we only send events through NotificationCenter, the following code can directly meet our requirements:

Swift
let n = Notification.Name("event")
Task {
    for await value in NotificationCenter.default.notifications(named: n, object: nil) {
        if let str = value.object as? String {
            await asyncPrint(value: str)
        }
    }
}

NotificationCenter.default.post(name: n, object: "event1")
NotificationCenter.default.post(name: n, object: "event2")
NotificationCenter.default.post(name: n, object: "event3")

Is it difficult to imagine something simple?

Unfortunately, Combine’s Subject and other Publishers do not directly follow the AsyncSequence protocol.

But this year’s Combine has added a very small but very important feature to Publishers - values.

The type of values is AsyncPublisher, which conforms to the AsyncSequence protocol. The purpose of the design is to convert Publishers into AsyncSequences. The following code can meet the needs of various types of Publishers:

Swift
let publisher = PassthroughSubject<String, Never>()
let p = publisher
        .buffer(size: 10, prefetch: .keepFull, whenFull: .dropOldest)
Task {
    for await value in p.values {
        await asyncPrint(value: value)
    }
}

Because AsyncSequence can only process data one by one, we don’t need to consider data serialization anymore.

The principle of converting a Publisher into an AsyncSequence is not complicated. We create a structure that conforms to AsyncSequence, transfer the data obtained from Publisher through AsyncStream, and point the iterator to AsyncStream’s iterator.

We can implement the function of values by ourselves with code. Here, we create a sequence that performs similar to values.

Swift
public struct CombineAsyncPublisher<P>: AsyncSequence, AsyncIteratorProtocol where P: Publisher, P.Failure == Never {
    public typealias Element = P.Output
    public typealias AsyncIterator = CombineAsyncPublisher<P>

    public func makeAsyncIterator() -> Self {
        return self
    }

    private let stream: AsyncStream<P.Output>
    private var iterator: AsyncStream<P.Output>.Iterator
    private var cancellable: AnyCancellable?

    public init(_ upstream: P, bufferingPolicy limit: AsyncStream<Element>.Continuation.BufferingPolicy = .unbounded) {
        var subscription: AnyCancellable?
        stream = AsyncStream<P.Output>(P.Output.self, bufferingPolicy: limit) { continuation in
            subscription = upstream
                .sink(receiveValue: { value in
                    continuation.yield(value)
                })
        }
        cancellable = subscription
        iterator = stream.makeAsyncIterator()
    }

    public mutating func next() async -> P.Output? {
        await iterator.next()
    }
}

public extension Publisher where Self.Failure == Never {
    var sequence: CombineAsyncPublisher<Self> {
        CombineAsyncPublisher(self)
    }
}

For the complete code, please refer to Gist. This example’s code reference Marin Todorov’s article.

There are slight differences between the implementation of sequence and values. If interested, you can use the following code to analyze their differences.

Swift
let p = publisher
    .print() // Observe the subscription's request status. The implementation of values is the same as solution two.
    // Sequence uses AsyncStream's buffer, so there is no need to set the buffer again.

for await value in p.sequence {
    await asyncPrint(value: value)
}

Conclusion

In the foreseeable future, Apple will surely provide more built-in integration methods for Combine and async/await. Perhaps in the year after next, the first two solutions can be directly used with the official API.

Get weekly handpicked updates on Swift and SwiftUI!