Search code examples
swiftserver-sent-eventsopenai-apivapor

How to stream data in Vapor Swift?


I'm building an Open AI chat stream backend using Vapor Swift. It connects to Open AI API using MacPaw's OpenAI wrapper. But I'm unsure how to stream the result to the client using SSE rather than as a single response.

My current code looks like this:

    func postChatStreamHandler(_ req: Request) throws -> EventLoopFuture<Response> {
        let openAI = OpenAI(configuration: configuration)
        let promise = req.eventLoop.makePromise(of: Data.self)
        let query = ChatQuery(model: .gpt4, messages: messages)
        openAI.chatsStream(query: query) { partialResult in
            switch partialResult {
            case .success(let result):
                if let detla = result.choices.first?.delta,
                   let data = try? JSONEncoder().encode(result) {
                    promise.succeed(data)
                }
            case .failure(let error):
                ...
            }
        } completion: { error in
            ...
        }
        return promise.futureResult.map { data in
            let response = Response()
            response.body = .init(buffer: ByteBuffer(data: data))
            return response
        }
    }

Solution

  • struct GenericAIController: RouteCollection {
        private let service = AIService()
        
        func boot(routes: RoutesBuilder) throws {
            let routes = routes.grouped("api", "ai")
            routes.get("completion", "stream", use: generatePoemStream(req:))
        }
        
        func generatePoemStream(req: Request) async throws -> AsyncThrowingStream<String, Error> {
            let query = ChatQuery(
                model: .gpt4_1106_preview,
                messages: [
                    .init(role: .user, content: "Write a poem")
                ]
            )
            
            let originalStream = service.chatsStream(query: query)
            
            return AsyncThrowingStream<String, Error> { continuation in
                Task {
                    for try await element in originalStream {
                        if let stringElement = element.choices.first?.delta.content {
                            continuation.yield(stringElement)
                        }
                    }
    
                    continuation.finish(throwing: nil)
                }
            }
        }
    
    }
    
    extension AsyncThrowingStream: AsyncResponseEncodable where Element: Encodable {
        public func encodeResponse(for request: Request) async throws -> Response {
            let response = Response(status: .ok)
            let body = Response.Body(stream: { writer in
                Task {
                    do {
                        for try await element in self {
                            let data = try JSONEncoder().encode(element)
                            _ = writer.write(.buffer(.init(data: data)))
                        }
                        
                        _ = writer.write(.end)
                    } catch {
                        // Handle errors as needed
                    }
                }
            })
    
            response.body = body
            return response
        }
    }