swifthttpstreamalamofireurlrequest

Receiving URL request with incomplete data error


I am making a simple ChatGPT app in Swift that requests data from the API in a stream. The API returns the correct data but when I receive it, words and characters are missing. I tried debugging but I'm not sure why the URL request is not returning the full data.

I will include 2 examples below. The first uses a basic HTTP request to get the response. This is the one I am experiencing the bug with. I would like to keep this approach because it enables me to cancel the task. I'm not sure how to cancel the task when using the second approach. The second approach uses the Almofire library to request the data.

This approach works and all data is returned complete. Realistically I'd like to use Almofire for the my main approach (first) since it is more robust but I'm not sure how to with the ability of canceling a stream midway. Why is the data not returned in full?

First approach (bug)

func sendMessageStream(Question_To_Be_Asked: String) async throws -> AsyncThrowingStream<String, Error> {
    var urlRequest = self.urlRequest
    urlRequest.httpBody = try jsonBody(text: Question_To_Be_Asked)
    
    let (result, response) = try await urlSession.bytes(for: urlRequest)
    try Task.checkCancellation()
    
    guard let httpResponse = response as? HTTPURLResponse else {
        throw "Invalid response"
    }
    
    guard 200...299 ~= httpResponse.statusCode else {
        var errorText = ""
        for try await line in result.lines {
            try Task.checkCancellation()
            errorText += line
        }
        
        if let data = errorText.data(using: .utf8), let errorResponse = try? jsonDecoder.decode(ErrorRootResponse.self, from: data).error {
            errorText = "\n\(errorResponse.message)"
        }
        
        throw "Bad Response: \(httpResponse.statusCode), \(errorText)"
    }
    
    var responseText = ""
    return AsyncThrowingStream { [weak self] in
        guard let self else { return nil }
        for try await line in result.lines {
            //print(line) <- incomplete data
            try Task.checkCancellation()
            if line.hasPrefix("data: "), let data = line.dropFirst(6).data(using: .utf8), let response = try? self.jsonDecoder.decode(StreamCompletionResponse.self, from: data), let text = response.choices.first?.delta.content {
                
                responseText += text
                return text
            }
        }
        return nil
    }
}

Second approach (working)

func sendStreamMessage(messages: [Message]) -> DataStreamRequest{
    let openAIMessages = messages.map({OpenAIChatMessage(role: $0.role, content: $0.content)})
    let body = OpenAIChatBody(model: "gpt-4", messages: openAIMessages, stream: true)
    let headers: HTTPHeaders = [
        "Authorization": "Bearer \(Constants.openAIApiKey)"
    ]
    
    return AF.streamRequest(endpointUrl, method: .post, parameters: body, encoder: .json, headers: headers)
}

func sendMessage(question: String)  {
    let messages = [Message(id: UUID().uuidString, role: .user, content: question, createAt: Date())]
    currentInput = ""
    
    sendStreamMessage(messages: messages).responseStreamString { [weak self] stream in
        guard let self = self else { return }
        switch stream.event {
        case .stream(let response):
            switch response {
            case .success(let string):
                let streamResponse = self.parseStreamData(string)
                
                streamResponse.forEach { newMessageResponse in
                    guard let messageContent = newMessageResponse.choices.first?.delta.content else {
                        return
                    }
                    //here messageContent is final complete string from stream
                }
            case .failure(_):
                print("Something failes")
            }
            print(response)
        case .complete(_):
            print("COMPLETE")
        }
    }
}

func parseStreamData(_ data: String) ->[ChatStreamCompletionResponse] {
    let responseStrings = data.split(separator: "data:").map({$0.trimmingCharacters(in: .whitespacesAndNewlines)}).filter({!$0.isEmpty})
    let jsonDecoder = JSONDecoder()
    
    return responseStrings.compactMap { jsonString in
        guard let jsonData = jsonString.data(using: .utf8), let streamResponse = try? jsonDecoder.decode(ChatStreamCompletionResponse.self, from: jsonData) else {
            return nil
        }
        return streamResponse
    }
}

struct ChatStreamCompletionResponse: Decodable {
    let id: String
    let choices: [ChatStreamChoice]
}

struct ChatStreamChoice: Decodable {
    let delta: ChatStreamContent
}

struct ChatStreamContent: Decodable {
    let content: String
}

struct Message: Decodable, Hashable {
    let id: String
    let role: SenderRole
    let content: String
    let createAt: Date
    
    func hash(into hasher: inout Hasher) {
        hasher.combine(id)
    }
}

struct OpenAIChatBody: Encodable {
    let model: String
    let messages: [OpenAIChatMessage]
    let stream: Bool
}

struct OpenAIChatMessage: Codable {
    let role: SenderRole
    let content: String
}

enum SenderRole: String, Codable {
    case system
    case user
    case assistant
}

Solution

  • The second approach uses the Alamofire library to request the data. This approach works and all data is returned complete. Realistically I'd like to use Almofire for the my main approach (first) since it is more robust but I'm not sure how to with the ability of canceling a stream midway

    I agree that the Alamofire Usage Guide on "Canceling and Resuming a Download " is more about canceling a download, not a stream.

    Yet, the basic cancel() method is available for all request types in Alamofire, including streaming requests.
    You could use this method to cancel an ongoing streaming request.
    Modify the sendStreamMessage function to return the DataStreamRequest object. That object can then be stored in a property for later access.
    And implement a method to cancel the stored DataStreamRequest.

    class ChatService {
        private var currentStreamRequest: DataStreamRequest?
    
        func sendStreamMessage(messages: [Message]) -> DataStreamRequest {
            // Existing code to set up and start the stream request
    
            let streamRequest = AF.streamRequest(endpointUrl, method: .post, parameters: body, encoder: .json, headers: headers)
            currentStreamRequest = streamRequest
            return streamRequest
        }
    
        func cancelStream() {
            currentStreamRequest?.cancel()
            currentStreamRequest = nil
        }
    
        // other methods
    }
    

    sendStreamMessage now stores the DataStreamRequest in currentStreamRequest. The cancelStream method then cancels this request.
    You would use it like this:

    let chatService = ChatService()
    // Start streaming
    let streamRequest = chatService.sendStreamMessage(messages: messages)
    
    // Cancel the stream when needed
    chatService.cancelStream()
    

    Note that it does not involve resume data, since it is more relevant for file downloads where partial data can be saved and resumed later. In the case of a data stream from an API, resuming is not typically feasible unless the API itself supports such a mechanism.