Search code examples
javaswiftprotocol-buffersgcdasyncsocket

Corrupt Protocol Buffers Messages


I am using Protocol Buffers for Swift (latest from CocoaPods) and Google's official Java Protocol buffer client (version 2.6.0) to to pass messages between a Java server (ServerSocket) and a Swift iOS app (GCDAsyncSocket).

Most messages (several hundred per second; I am streaming audio as float arrays, among other things) flow just fine. Occasionally, however, a message from client to server won't parse. The Java code throws a

com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero)

On both ends I am sending a 4-byte Big-Endian integer representing the number of bytes to follow, then the raw protobuf message. On both ends I am receiving the number of bytes to follow, blocking until I get that many bytes, and then attempting to parse.

There are no errors observed in the Java->Swift direction, only Swift->Java.

The vast majority of messages are fine. The problem appears to increase in frequency with the number of messages being processed.

In Java each client has a thread talking to it and a thread listening to it. The listener thread pulls messages off the wire and puts them into LinkedBlockingQueues per client. The talking thread pulls message off the LinkedBlockingQueue for that client, serializes them, and send them to that client's output stream.

// Take a messageBuilder, serialize and transmit it
func transmit(messageBuilder: Message_.Builder) {
    do {
        messageBuilder.src = self.networkID;
        let data = try messageBuilder.build().data()
        var dataLength = CFSwapInt32HostToBig(UInt32(data.length))

        self.socket.writeData(NSData(bytes: &dataLength, length: 4), withTimeout: 1, tag: 0)
        self.socket.writeData(data, withTimeout: 1, tag: 0)
    } catch let error as NSError {
        NSLog("Failed to transmit.")
        NSLog(error.localizedDescription)
    }
}

Java receiving side:

        public void run() {
        while (true) {
            try {
                byte[] lengthField = new byte[4];
                try {
                    ghost.in.readFully(lengthField, 0, 4);
                } catch (EOFException e) {
                    e.printStackTrace();
                    ghost.shutdown();
                    return;
                }
                Integer bytesToRead = ByteBuffer.wrap(lengthField).order(ByteOrder.BIG_ENDIAN).getInt();
                byte[] wireMessage = new byte[bytesToRead];
                in.readFully(wireMessage, 0, bytesToRead);

                HauntMessaging.Message message = HauntMessaging.Message.parseFrom(wireMessage);

                // do something with the message


            } catch (IOException e) {
                e.printStackTrace();
                ghost.shutdown();
                return;
            }
        }
    }

Any ideas?


Solution

  • Got it!

    The two calls consecutive calls to socket.writeData were not necessarily atomic, but called from several threads. They were getting interleaved, so that first it wrote a length and then it wrote a different length (and/or somebody else's message).

    Surrounding those two calls in a dispatch_async block for a DISPATCH_QUEUE_SERIAL fixed the problem.