Coordinating async operations - TL;DR

This is a shorter version of my previous post on async operations. It focuses more on code and has less discussions than the full version.

Full version

The full version of this blog post contains more discussions, more extensive examples and also covers existing alternatives like (PromiseKit, AwaitKit, RxSwift and ReactiveCocoa) etc. in more detail, including a discussion on why I don’t use them and some things I think you should consider before you do. You can read it here.

Disclaimer

Before we start, I just want to emphasize that I don’t claim that this is the ultimate way to work with async operations. The libraries above are pretty great in their own ways. However, if you must keep your dependencies to a minimum and want to have a lightweight way to coordinate async operations, the approach I present may be worth considering. Also, it’s a pretty good modeling challenge. So with this mindset, let’s dive in!

Coordinating simple operations

If we just want to coordinate a set of abstract operations, we can use a simple approach that involves operations and coordinators.

Let’s start by defining and abstract operation:

protocol Operation {
    
    typealias Completion = (Error?) -> ()
    
    func perform(completion: @escaping Completion)
}

To coordinate how operations are executed, let’s define an OperationCoordinator:

protocol OperationCoordinator {
    
    typealias Completion = ([Error?]) -> ()
    
    func perform(_ operations: [Operation], completion: @escaping Completion)
}

We can now implement coordinators that implement the coordinator protocol in various ways. Let’s start with a concurrent one.

Concurrent operations

Creating a concurrent operation coordinator is really easy:

class ConcurrentOperationCoordinator: OperationCoordinator {
    
    func perform(_ operations: [Operation], completion: @escaping Completion) {
        guard operations.count > 0 else { return completion([]) }
        var errors = [Error?]()
        operations.forEach {
            $0.perform { error in
                errors.append(error)
                let isComplete = errors.count == operations.count
                guard isComplete else { return }
                completion(errors)
            }
        }
    }
}

Using this coordinator is very easy:

class MyOperation: Operation {
    var error: Error?
    func perform(completion: Completion) { 
        completion(error) 
    }
}

let operations = [MyOperation(), MyOperation()]
let coordinator = ConcurrentOperationCoordinator()
coordinator.perform(operations) { errors in
    print("All done")
}

We could also use this coordinator as an internal tool in other classes, where the operations are hidden from the external interface.

Serial operations

If concurrency is not an option, we could use a serial coordinator instead:

class SerialOperationCoordinator: OperationCoordinator {
    
    func perform(_ operations: [Operation], completion: @escaping Completion) {
        performOperation(at: 0, in: operations, errors: [], completion: completion)
    }
    
    private func performOperation(at index: Int, in operations: [Operation], errors: [Error?], completion: @escaping Completion) {
        guard operations.count > index else { return completion(errors) }
        let operation = operations[index]
        operation.perform { (error) in
            let errors = errors + [error]
            self.performOperation(at: index + 1, in: operations, errors: errors, completion: completion)
        }
    }
}

Since this class implements the same protocol as the previous coordinator, you can just switch out the implementation in the example above:

let coordinator = SerialOperationCoordinator()
coordinator.perform(operations) { errors in
    print("All done")
}

We now have two basic ways of coordinating parameterless operations. Let’s take this approach further and build a bit more powerful tools.

Operating on a collection

As a complement to anonymous operations, let’s create a protocol that describes how to perform an operation on a typed collection:

protocol CollectionOperation: AnyObject {
    
    associatedtype CollectionType
    typealias T = CollectionType
    typealias Completion = ([Error?]) -> ()
    
    func perform(on collection: [T], completion: @escaping Completion)
}

When you implement this protocol, just implement perform(on:completion:) and specify the CollectionType with a typealias.

Now let’s create more specialized versions of this protocol, that describe how to operate on single items and batches of items:

protocol ItemOperation: CollectionOperation {
    
    typealias ItemCompletion = (Error?) -> ()
    
    func perform(onItem item: T, completion: @escaping ItemCompletion)
}
protocol BatchOperation: CollectionOperation {
    
    typealias BatchCompletion = (Error?) -> ()
    
    var batchSize: Int { get }
    
    func perform(onBatch batch: [T], completion: @escaping BatchCompletion)
}

When you implement these protocols, it’s important that you call the item and batch completions when the operation completes for every item/batch. As you will see, various implementations will depend on these completions to be called.

We now have three protocols that desribe how to operate on collections, but no implementations nor any real benefits. Let’s do something about this by creating even more specialized versions of these protocols.

Concurrent item operation

The simplest specialization we can make of the protocols above, is to create a protocol that concurrently performs an operation on all items in a collection:

protocol ConcurrentItemOperation: ItemOperation {}

extension ConcurrentItemOperation {
    
    func perform(on collection: [T], completion: @escaping Completion) {
        guard collection.count > 0 else { return completion([]) }
        var errors = [Error?]()
        collection.forEach {
            perform(onItem: $0) { error in
                errors.append(error)
                let isComplete = errors.count == collection.count
                guard isComplete else { return }
                completion(errors)
            }
        }
    }
}

As you can see, this protocol implements perform(on:completion:) as a protocol extension. This means that if you implement this protocol instead of ItemOperation, you just have to implement perform(onItem:completion:). This protocol will take care of concurrently performing the operation on all items.

Concurrent batch operation

Using the same approach as above, it’s very simple to create a similar protocol that operates on batches instead of single items:

protocol ConcurrentBatchOperation: BatchOperation {}

extension ConcurrentBatchOperation {
    
    func perform(on collection: [T], completion: @escaping Completion) {
        guard collection.count > 0 else { return completion([]) }
        var errors = [Error?]()
        let batches = collection.batched(withBatchSize: batchSize)
        batches.forEach {
            perform(onBatch: $0) { error in
                errors.append(error)
                let isComplete = errors.count == batches.count
                guard isComplete else { return }
                completion(errors)
            }
        }
    }
}

This protocol also implements perform(on:completion:) as a protocol extension. This means that if you implement this protocol instead of BatchOperation, you just have to implement perform(onBatch:completion:). This protocol will take care of concurrently performing the operation on all batches.

Serial item operation

If your operation is asynchronous and the order of execution is important, you can’t use concurrent operations, since a simple network delay could mess up the completion order. If you can’t solve on a system level, you could execute your operations serially instead of concurrently.

Let’s create a serial ItemOperation for these cases:

protocol SerialItemOperation: ItemOperation {}

extension SerialItemOperation {
    
    func perform(on collection: [T], completion: @escaping Completion) {
        perform(at: 0, in: collection, errors: [], completion: completion)
    }
    
    private func perform(at index: Int, in collection: [T], errors: [Error?], completion: @escaping Completion) {
        guard collection.count > index else { return completion(errors) }
        let object = collection[index]
        perform(onItem: object) { [weak self] error in
            let errors = errors + [error]
            self?.perform(at: index + 1, in: collection, errors: errors, completion: completion)
        }
    }
}

This protocol also implements perform(on:completion:) as a protocol extension. This means that if you implement this protocol instead of ItemOperation, you just have to implement perform(onItem:completion:). This protocol will take care of serially performing the operation on all items, and will wait for each item operation to complete before it proceeds with the next.

Serial batch operation

Finally, using the same approach as above, we can easily create another protocol that operates on batches of items instead of single items:

protocol SerialBatchOperation: BatchOperation {}

extension SerialBatchOperation {
    
    func perform(on collection: [T], completion: @escaping Completion) {
        let batches = collection.batched(withBatchSize: batchSize)
        perform(at: 0, in: batches, errors: [], completion: completion)
    }
    
    private func perform(at index: Int, in batches: [[T]], errors: [Error?], completion: @escaping Completion) {
        guard batches.count > index else { return completion(errors) }
        let batch = batches[index]
        perform(onBatch: batch) { [weak self] error in
            let errors = errors + [error]
            self?.perform(at: index + 1, in: batches, errors: errors, completion: completion)
        }
    }
}

This protocol also implements perform(on:completion:) as a protocol extension. This means that if you implement this protocol instead of BatchOperation, you just have to implement perform(onBatch:completion:). This protocol will take care of serially performing the operation on all batches, and will wait for each batch operation to complete before it proceeds with the next.

Let’s put everything together in a short example.

Examples

We now have four completely different collection operations that are externally and internally interchangeable. Let’s put everything together in a short example.

Let’s build an imaginary image syncer that syncs images that were taken while the user was offline:

class ImageSyncer: ConcurrentItemOperation {
    
    typealias CollectionType = UIImage
    
    func perform(onItem item: UIImage, completion: @escaping ItemCompletion) {
        syncImage(item) { error in
            completion(error)
        }
    }

    func syncImage(_ image: UIImage, completion: @escaping ItemCompletion) {
        // Implement this in some way and all completion when done :)
    }
}

You can use this image syncer like this:

let syncer = ImageSyncer()
let images = [offlineImage1, offlineImage2, ...]
syncer.perform(on: images) { errors in
    print("All done!")
}

This will sync all images concurrently and print “All done!” when it’s done. If you’d like it to be serial instead, just change which protocol it implements:

class ImageSyncer: SerialItemOperation {
    
    // The rest can be left unchanged :)
}

Since it’s still a collection operation, you can still call it like this:

let syncer = ImageSyncer()
let images = [offlineImage1, offlineImage2, ...]
syncer.perform(on: images) { errors in
    print("All done!")
}

If your system supports syncing images in batches, the syncer could implement ConcurrentBatchOperation instead:

class ImageSyncer: ConcurrentBatchOperation {
    
    typealias CollectionType = UIImage
    
    func perform(onBatch batch: [UIImage], completion: @escaping BatchCompletion) {
        syncImages(batch) { error in
            completion(error)
        }
    }

    func syncImages(_ images: [UIImage], completion: @escaping ItemCompletion) {
        // Implement this in some way :)
    }
}

The syncer is still a collection operation, so you can still call it like before.

Finally, if you want to perform this operation serially instead of concurrently, just replace ConcurrentBatchOperation with SerialBatchOperation:

class ImageSyncer: SerialBatchOperation {
    
    // The rest can be left unchanged :)
}

The syncer is still a collection operation, so you can still call it like before.

That’s it! We have implemented an image syncer using the new operation protocols, and also changed how it operates with minimal changes!

Final improvements

With these new collection operations in place, we can simplify the coordinators that we implemented earlier:

class ConcurrentOperationCoordinator: OperationCoordinator, ConcurrentItemOperation {
    
    typealias CollectionType = Operation
    
    func perform(_ operations: [Operation], completion: @escaping Completion) {
        perform(on: operations, completion: completion)
    }
    
    func perform(onItem item: iExtra.Operation, completion: @escaping ItemCompletion) {
        item.perform(completion: completion)
    }
}


class SerialOperationCoordinator: OperationCoordinator, SerialItemOperation {
    
    typealias CollectionType = Operation
    
    func perform(_ operations: [Operation], completion: @escaping Completion) {
        perform(on: operations, completion: completion)
    }
    
    func perform(onItem item: iExtra.Operation, completion: @escaping ItemCompletion) {
        item.perform(completion: completion)
    }
}

We have now gone full circle and used the things we’ve created in various ways. I hope you find it useful.

Conclusion

I hope you liked this post. If you decide to use the pattern, I would love to see some implementations.

The implementation in this post is just vanilla Swift with some abstractions and auto-implementations. It could probably be improved to use GCD and extended in a bunch of ways, but I hope that you enjoyed the discussions we could have by not walking down that path.

I have pushed the source code to my personal iExtra library (open source, but I mainly maintain it for myself). If you want to try it out, you can find it here.