Search code examples
swiftasync-awaitactorswift-concurrency

How to support an async callback with timeout in swift actor


I have a mix of async/await (Actors) running "legacy" code (Process Pipes and callback blocks). I have a situation similar to this where my actor's function returns a value, while also storing a deferred callback once some work is completed ( actor should not block while waiting for the work to complete, the actor purpose is only to support id generation and internal updates which need to be threadsafe). Ideally I would like to structure the callback to also be ergonomic as an "async" variant I am using an actor for thread safety. One of the async functions stores a callback which is called once the work is completed or once a timeout has occurred. Currently getting a deadlock as well as "ugly" nesting.

typealias Block = (String?) async -> Void // do I need the async keyword here necessarily?

actor Processor {
  let id = Int
  var callbacks = [Int:Block]()
  let process : Process // external process which communicates of stdin/stdout
  init(processPath: String) async throws {
    process = Process()
    process.launchPath = processPath
    process.standardInput = Pipe()
    process.standardOutput = Pipe()
    (process.standardOutput as! Pipe).fileHandleForReading.readabilityHandler = {[weak self] handler in
       // does not support async/await directly, wrapping in a task, perhaps better mechanism?
       Task {[weak self] in
          await self?.handleOutput(data: handler.availableData)
       }
    }
  }

  struct Result: Codable { 
     id: Int ,
     output: String
  }
  func handleOutput(data: Data) {
     if let decoded = try? JSONDecoder().decode(Result.self, from: data),
            let id = decoded.id ,
            let callback = pop(id: id) {
            await callback(decoded.output) // call the block, this time with real value vs when there was a timeout
        }
  }
  func work(text: String, completion: @escaping Block) async -> WorkArgs {
     id += 1 // increment the running id
     // store the callback
     callbacks[id] = completion
     // timeout if the result doesn't come back in a reasonable amount of time. 
     // I will perform a check after 1 second, and pop the callback if needed
     // problematic here..., how to wait while also not blocking this function
     Task { [weak self] in
       // invalidate if needed
       try await Task.sleep(nanoseconds: 1_000_000_000)
       // dead lock here:
       if let bl = await self?.pop(id: id) {
            print("invalidated the task \(id)")
            await bl(nil)
       }
     }
     // call out to the external process with this convention, it will run async and print a result with the same id
     let command = "\(id) \(text)\n"
     let d = command(using: .utf8)

     try! (process.standardInput as! Pipe).fileHandleForWriting.write(contentsOf: d)
     
  }
  // pop this callback
  func pop(id: Int) -> Block? {
        return callbacks.removeValue(forKey: id)
  }
}

struct WorkArgs {
  let id: Int
  let date: Date
}

actor IdGen {
    private var id : Int64 = 0
    func next() -> Int64 {
        id += 1
        return id
    }
}

actor CallbackActor {
var pendingCallbacks = [Int: (String) -> Void]()
    func push(_ key: Int, block: @escaping  (String) -> Void) {
        pendingCallbacks[key] = block
    }
    func pop(_ key: Int64) -> AutoCompleteBlock? {
        return pendingCallbacks.removeValue(forKey: key)
    }
}

Solution

  • Before I get to the timeout question, we should probably talk about how to wrap the Process within Swift concurrency.

    • One pattern would be to use AsyncSequence (i.e., an AsyncStream) for stdout:

      actor ProcessWithStream {
          private let process = Process()
          private let stdin = Pipe()
          private let stdout = Pipe()
          private let stderr = Pipe()
          private var buffer = Data()
      
          init(url: URL) {
              process.standardInput = stdin
              process.standardOutput = stdout
              process.standardError = stderr
              process.executableURL = url
          }
      
          func start() throws {
              try process.run()
          }
      
          func terminate() {
              process.terminate()
          }
      
          func send(_ string: String) {
              let data = Data("\(string)\n".utf8)
              stdin.fileHandleForWriting.write(data)
          }
      
          func stream() -> AsyncStream<Data> {
              AsyncStream(Data.self) { continuation in
                  stdout.fileHandleForReading.readabilityHandler = { handler in
                      continuation.yield(handler.availableData)
                  }
                  process.terminationHandler = { handler in
                      continuation.finish()
                  }
              }
          }
      }
      

      Then you can for await that stream:

      let process = ProcessWithStream(url: url)
      
      override func viewDidLoad() {
          super.viewDidLoad()
      
          Task {
              try await startStream()
              print("done")
          }
      }
      
      @IBAction func didTapSend(_ sender: Any) {
          let string = textField.stringValue
          Task {
              await process.send(string)
          }
          textField.stringValue = ""
      }
      
      func startStream() async throws {
          try await process.start()
          let stream = await process.stream()
      
          for await data in stream {
              if let string = String(data: data, encoding: .utf8) {
                  print(string, terminator: "")
              }
          }
      }
      

      This is a simple approach. And it looks fine (because I am printing responses without a terminator).

      But one needs to be careful because readabilityHandler will not always be called with the full Data of some particular output. It might be broken up or split across separate calls to the readabilityHandler.

    • Another pattern would be to use lines, which avoids the problem of readabilityHandler possibly being called multiple times for a given output:

      actor ProcessWithLines {
          private let process = Process()
          private let stdin = Pipe()
          private let stdout = Pipe()
          private let stderr = Pipe()
          private var buffer = Data()
          private(set) var lines: AsyncLineSequence<FileHandle.AsyncBytes>?
      
          init(url: URL) {
              process.standardInput = stdin
              process.standardOutput = stdout
              process.standardError = stderr
              process.executableURL = url
          }
      
          func start() throws {
              lines = stdout.fileHandleForReading.bytes.lines
              try process.run()
          }
      
          func terminate() {
              process.terminate()
          }
      
          func send(_ string: String) {
              let data = Data("\(string)\n".utf8)
              stdin.fileHandleForWriting.write(data)
          }
      }
      

      Then you can do:

      let process = ProcessWithLines(url: url)
      
      override func viewDidLoad() {
          super.viewDidLoad()
      
          Task {
              try await startStream()
              print("done")
          }
      }
      
      @IBAction func didTapSend(_ sender: Any) {
          let string = textField.stringValue
          Task {
              await process.send(string)
          }
          textField.stringValue = ""
      }
      
      func startStream() async throws {
          try await process.start()
          guard let lines = await process.lines else { return }
      
          for try await line in lines {
              print(line)
          }
      }
      

      This avoids the breaking of responses mid-line.


    You asked:

    How to support ... timeout in swift actor

    The pattern is to wrap the request in a Task, and then start a separate task that will cancel that prior task after a Task.sleep interval.

    But this is going to be surprisingly complicated in this case, because you have to coordinate that with the separate Process which will otherwise still proceed, unaware that the Task has been canceled. That can theoretically lead to problems (e.g., the process gets backlogged, etc.).

    I would advise integrating the timeout logic in the app invoked by the Process, rather than trying to have the caller handle that. It can be done (e.g. maybe write the process app to capture and handle SIGINT and then the caller can call interrupt on the Process). But it is going to be complicated and, most likely, brittle.