I have a pipeline in ReactiveSwift for uploads. I want to make sure that even if one of the uploads fail, the rest will not be interrupted.
After all of them complete with success or failure, I should either return success from the performUploads
method, or if any have failed, I should return an error, so the next step, the downloads part won't start. Even if there are errors, all uploads should have a chance to upload, they should not be stopped.
Is there a way to figure out if there are any errors after the uploads complete? See the methods here:
let pendingUploadItemsArray: [Items] = ...
func performUploads() -> SignalProducer<(), MyError> {
return upload(pendingUploadItemsArray)
.then(doAnything())
}
private func upload(_ items: [Items]) -> SignalProducer<Signal<(), MyError>.Event, Never> {
let producers = items
.filter { item in
return item.readyForUpload
}
.map { self.upload($0).materialize() }
return SignalProducer.merge(producers)
}
private func upload(_ item: Item) -> SignalProducer<(), MyError> {
return internalUploader.upload(item)
.on(failed: failedToUpload(item),
value: successfullyUploaded(item))
.ignoreValues()
}
where the internalUploader
upload method is:
func upload(_ item: Item) -> SignalProducer<Any, MyError>
And then in another class you would call this uploader:
let sync = self.uploader.performUploads()
.then(startDownloads())
The startDownloads
should only run if all the uploads have completed with success.
Thanks for any insight.
This might be something that should be done in a completely different manner.
I'm don't know exactly what successfullyUploaded
and failedToUpload
are doing in your code, but presumably you're keeping track of successes and failures to provide some kind of live progress UI. This is how I would structure it:
struct UploadResult {
let item: Item
let error: Error? // nil if the upload succeeded
var succeeded: Bool { error == nil }
var failed: Bool { !succeeded }
}
...
static func upload(_ items: [Item]) -> SignalProducer<[UploadResult], Never> {
SignalProducer(items)
.filter(\.readyForUpload)
.flatMap(.merge) { item in
Self.internalUploader(item)
.map { UploadResult(item: item, error: nil) }
.flatMapError { error in
SignalProducer(value: UploadResult(item: item, error: error))
}
}
.scan(into: [UploadResult]()) { ( results: inout [UploadResult], nextResult) in
results.append(nextResult)
}
}
UploadResult
struct that represents an item that has succeeded or failed to upload.upload
function, instead of creating an array of producers and then merging them, I convert the array of items into a signal producer of items with SignalProducer(items)
and then use flatMap(.merge)
to merge the uploads into a single signal producer.materialize
, I use map
to convert successful uploads into an UploadResult
and I use flatMapError
to convert failed uploads into an UploadResult
.scan
to accumulate the results as each upload completes. Each time an upload finishes (either successfully or with an error), scan
will send an updated array of upload results that can be used to update the UI.Then you could use it like this:
Uploader.upload(someItems)
.on(value: { resultsSoFar in
// Update UI here
})
.take(last: 1)
.attempt { results in
if !results.allSatisfy(\.succeeded) {
// At least one of the uploads failed, so send an error
throw MyError()
}
}
.then(startDownloads)
on(value:)
operator to update the UI based on the current results. Each time a download succeeds or fails, this closure will be called with the updated results.take(last: 1)
to filter out all intermediate results; it'll only send along the final results when all uploads have completed.attempt
to check if any of the uploads have failed and throw an error if so. This ensures the downloads will only be started if all uploads succeeded.Hopefully this handles your use case, but let me know in a comment if I missed something about the broader context!
If you only care about dealing with results one at a time rather than as a running array, you can get rid of the scan
and then replace take(last: 1)
with collect
:
static func upload(_ items: [Item]) -> SignalProducer<UploadResult, Never> {
SignalProducer(items)
.filter(\.readyForUpload)
.flatMap(.merge) { item in
Self.internalUploader(item)
.map { UploadResult(item: item, error: nil) }
.flatMapError { error in
SignalProducer(value: UploadResult(item: item, error: error))
}
}
}
...
Uploader.upload(someItems)
.on(value: { latestResult in
// Do something with the latest result
})
.collect()
.attempt { results in
if !results.allSatisfy(\.succeeded) {
// At least one of the uploads failed, so send an error
throw MyError()
}
}
.then(startDownloads)