Search code examples
swiftactornstaskswift-concurrencynsfilehandle

NSTask/Process + NSPipe + NSFileHandle in Modern Swift Concurrency


Context

Suppose we have a Mac app that uses Process (formerly NSTask) to run some other executable which writes data to both STDOUT and STDERR. This process might take a long time to do its work. Prior to Swift Concurrency, we would do this:

// Assume the object with this function is bound to the MainActor:
func doSomeWork()
{
    DispatchQueue.global(qos: .default).async 
    {
        let task = Process()
            
        let errorPipe = Pipe()
        let errorFileHandle = errorPipe.fileHandleForReading
        task.standardError = errorPipe
            
        let outputPipe = Pipe()
        let outputFileHandle = outputPipe.fileHandleForReading
        task.standardOutput = outputPipe
            
        //  Need to read fileHandles because if more than 64kb of data is written, they'll block the whole thread until we read them.
        var outputData = Data(capacity: 1000)
        var errorData = Data(capacity: 1000)

        // Semaphore to make sure our readability handlers exit before we continue. NSTask does not guarantee they'll finish before -waitUntilExit fires.
        let sema = DispatchSemaphore(value: 0)

        outputFileHandle.readabilityHandler = { handle in
            let newData = handle.availableData()
            if newData.count == 0 {
                // end-of-data signal is an empty data object.
                outputFileHandle.readabilityHandler = nil
                sema.signal()
            } else {
                outputData.append(newData)
            }
        }

        errorFileHandle.readabilityHandler = { handle in
            let newData = handle.availableData()
            if newData.count == 0 {
                // end-of-data signal is an empty data object.
                errorFileHandle.readabilityHandler = nil
                sema.signal()
            } else {
                errorData.append(newData)
            }
        }

        task.terminationHandler = { task in
            sema.signal()
        }

        defer {
            errorFileHandle.closeFile()
            outputFileHandle.closeFile()
        }

        do {
            try task.run()
        } catch {
            // handle error
        }

        sema.wait()
        sema.wait()
        sema.wait()

        // Use outputData and errorData to do whatever...
    }

}

Race Conditions

Note that the readabilityHandlers make no guarantee on which thread they're executed. And Process makes no guarantee that each readabilityHandler is done writing data by the time task.terminationHandler is called. Process also makes no guarantee that the readabilityHandlers are finished by the time waitUntilExit() returns. Do not suggest that alternative; I've been there and I have the t-shirt.

Therefore, I use a semaphore to ensure that all three pieces of the puzzle are 100% finished before I continue.

outputData and errorData are shared mutable state across threads, but because of the semaphore, it's guaranteed that nothing will read from either object until all the writes from the two readabilityHandlers have finished.

Adopting Swift Concurrency

I understand that I can use an Actor to wrap outputData and errorData to satisfy the compiler's error about Mutatation of captured var outputData in concurrently-executing code. and formalize my currently "manual" guard against race conditions (at the expense of a lot more context-switching overhead.)

QUESTION:

In Swift concurrency, threads are always supposed to make forward progress. Which means no semaphores. So even if I move the Process and the readabilityHandlers to an Actor, how do I properly "await" all three pieces being done?


Solution

  • As a general rule, where we might have reached for a semaphore, we would now frequently await a task or continuation. If wrapping some traditional asynchronous API that does not yet support Swift concurrency, we would often do this with withCheckedContinuation or withCheckedThrowingContinuation. But as you correctly note, we would scrupulously avoid semaphores in Swift concurrency.

    But, in this case, when dealing with a series of asynchronous events, I would probably make an AsyncSequence for the availableData:

    extension Pipe {
        struct AsyncAvailableData: AsyncSequence {
            typealias Element = Data
    
            let pipe: Pipe
    
            func makeAsyncIterator() -> AsyncStream<Element>.Iterator {
                AsyncStream { continuation in
                    pipe.fileHandleForReading.readabilityHandler = { @Sendable handle in
                        let data = handle.availableData
    
                        guard !data.isEmpty else {
                            continuation.finish()
                            return
                        }
    
                        continuation.yield(data)
                    }
    
                    continuation.onTermination = { _ in
                        pipe.fileHandleForReading.readabilityHandler = nil
                    }
                }.makeAsyncIterator()
            }
        }
    
        var availableData: AsyncAvailableData { AsyncAvailableData(pipe: self) }
    }
    

    I would then use async let (SE-0317) to run these in concurrently:

    func doSomeWork() async throws {
        let process = Process()
        process.executableURL = command
    
        let inputPipe  = Pipe()
        let outputPipe = Pipe()
        let errorPipe  = Pipe()
    
        process.standardError  = errorPipe
        process.standardOutput = outputPipe
        process.standardInput  = inputPipe
    
        // optionally, you might want to return whatever non-zero termination status code the process returned
    
        process.terminationHandler = { process in
            if process.terminationStatus != 0 {
                exit(process.terminationStatus)
            }
        }
    
        async let outputTask = outputPipe.availableData.joined()
        async let errorTask  = errorPipe.availableData.joined()
    
        try process.run()
    
        let outputData = await outputTask
        let errorData = await errorTask
    
        …
    }
    

    Where:

    extension Pipe.AsyncAvailableData {
        func joined() async -> Data {
            var data = Data()
            for await availableData in self {
                data.append(availableData)
            }
            return data
        }
    }
    

    Note, I created an asynchronous sequence of the Data representing the availableData supplied to the readabilityHandler. Often, I would create an AsyncSequence of the individual bytes of data as shown in https://stackoverflow.com/a/76941591/1271826. That way, if you wanted to process them line by line, you get that behavior for free. But if you just load the whole stream in memory at one time, then there is no need to add that overhead of converting the sequence of Data to a stream of bytes and then back to one large Data.


    While I hope I answered your question above, for the sake of future readers, I must note that there are serious downsides to attempting to load all of the standardOutput and standardError into their respective Data. Specifically, you will have to load the full output in memory at the same time.

    We would generally want to process the data as it comes in, rather than ever trying to hold the whole thing in memory at one time. In trivial use-cases, loading the whole thing into RAM works fine, but as the input grows, it becomes less and less practical. Caveat emptor.