I'm working on a little problem : concurrently fetch a bunch of chunks of ordered data and output it in the correct order (specifically, fetch a load of .ts
files, pipe them through ffmpeg
in the correct order, but that's incidental). Wanting to separate the re-ordering (deliver the fragments in sequence) and the piping (let ffmpeg do it's thing), I've written this as two objects:
struct Aggregator<ChunkType> {
private var currentIdx = 0
private var chunks: [Int:ChunkType] = [:]
private let serializer: Serializer
init(serializer: Serializer) where Serializer.ChunkType == ChunkType {
// self.serializer = serializer
}
private let queue = DispatchQueue(label: "Serial")
mutating func accept(chunk:ChunkType, forIndex idx: Int) {
// One accessor at a time
queue.sync {
chunks[idx] = chunk
// Forward zero or more chunks
while chunks[currentIdx] != nil {
//serializer.append(chunk: chunks.removeValue(forKey: currentIdx))
currentIdx += 1
}
}
}
}
And:
protocol Serializer {
associatedtype ChunkType
func append(chunk: ChunkType)
}
With two implementations:
// For capturing the output of an Aggregator<Int> to check it's working as intended
class IntArraySerializer: Serializer {
typealias ChunkType = Int
var allInts = [Int]()
func append(chunk: ChunkType) {
allInts.append(chunk)
}
}
// For piping output through ffmpeg
class FFMPEGSerializer: Serializer {
typealias ChunkType = Data
private let ffmpegBin = "/usr/local/bin/ffmpeg"
private let ffmpeg: Process
private let stdin: Pipe
init(args: [String]) throws {
ffmpeg = Process()
ffmpeg.launchPath = ffmpegBin
ffmpeg.arguments = args
stdin = Pipe()
ffmpeg.standardInput = stdin
}
func append(chunk: Data) {
stdin.fileHandleForWriting.write(chunk)
}
}
The end goal being that the usage would look something like:
let ffmpeg = FFMPEGSerializer(["-i", "-", "-c", "copy", "-bsf:a aac_adtstoasc", "output.mp4"])
let aggregator = Aggregator(serializer: ffmpeg)
for (idx, url) in someListOfUrls.enumerated() {
concurrentQueue.async {
fetch(url) { data in
aggregator.accept(chunk: data)
}
}
}
However my understanding of associated types, generics and possibly type erasure (if that is indeed the solution here) are preventing me from finalising Aggregator
, as can be seen from the various commented lines. I've done some reading on type-erasure (e.g. this Big Nerd Ranch article), but I'm not grasping how or if I should try to apply it in my case.
To boil it down to one statement : I want the ChunkType
of the Serializer
to inform the generic type of the Aggregator
.
As the error message is clearly stating, you cannot use the protocol Serializer
to annotate the type of properties or parameters.
I do not understand what you mean with type-erasure, so I may be misunderstanding what you mean, but as far as I read your code, you can easily fix your Aggregator
using generics:
//Make `S` a generic parameter constrained to `Serializer`
struct Aggregator<S: Serializer> {
//`ChunkType` needs always to be the same type as `S.ChunkType`
typealias ChunkType = S.ChunkType
private var currentIdx = 0
private var chunks: [Int: ChunkType] = [:]
//You can use `S` as a type of properties...
private let serializer: S
//...or a type of parameters.
init(serializer: S) {
self.serializer = serializer
}
private let queue = DispatchQueue(label: "Serial")
mutating func accept(chunk: ChunkType, forIndex idx: Int) {
// One accessor at a time
queue.sync {
chunks[idx] = chunk
// Forward zero or more chunks
while chunks[currentIdx] != nil {
serializer.append(chunk: chunks.removeValue(forKey: currentIdx)!)
currentIdx += 1
}
}
}
}