Search code examples
tcpstreamakka

akka TCP conflate (concat) source stream messages


I have interesting problem with akka TCP streams:

See the code:

package snp.server;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorRef;
import akka.stream.javadsl.Flow;
import akka.stream.javadsl.Source;
import akka.stream.javadsl.Tcp;
import akka.stream.javadsl.Tcp.OutgoingConnection;
import akka.util.ByteString;
import akka.actor.ActorSystem;
import akka.japi.Pair;
import akka.stream.Attributes;
import akka.stream.CompletionStrategy;
import akka.stream.OverflowStrategy;
import akka.stream.javadsl.Framing;
import akka.stream.javadsl.Keep;
import akka.stream.javadsl.Sink;
import java.time.Duration;
import java.util.Optional;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

public class TCPClient_SinkSource1 {

    public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {

        final ActorSystem system = ActorSystem.create("StreamTcpDocTest");

        final Flow<ByteString, ByteString, CompletionStage<OutgoingConnection>> connection
                = Tcp.get(system).outgoingConnection("192.168.62.130", 59090).map( f -> {
                     System.out.println("out1:" + f.utf8String());
                    return f;
                });

        final Sink<ByteString, CompletionStage<Done>> sink = Sink.foreach(f -> {
            System.out.println("from server:" + f.utf8String());
        });

        Source<ByteString, NotUsed> source = Source.range(1, 5).map(f -> ByteString.fromString(f.toString()))
                .throttle(1, Duration.ofMillis(30));

        
        Flow<ByteString, ByteString, NotUsed> clientFlow = Flow.fromSinkAndSource(sink, source).map( f -> {
                     System.out.println("out:" + f.utf8String());
                    return f;
                });     

        CompletionStage<OutgoingConnection> connectionCS = connection
         .join(clientFlow).run(system);

        connectionCS.whenComplete((d, e) -> {
            System.out.println("client conn: localAddress:" + d.localAddress()
                    + " remoteAddress:" + d.remoteAddress());
            System.out.println("e:" + e);
        });

    }
}

And The result is somehow concatenated and server reply is for example 123 45.

When I increase to : throttle(1, Duration.ofMillis(3000));

The server reply is one by one as I would expect. Can somebody describe how to avid concatenated reply ?


Solution

  • You should not think of the individual ByteStrings as frames in your network protocol but use some actual framing, and collect entire frames on the consuming side, a single ByteString may end up as several ByteStrings or the other way around on the receiving side depending on both the sending system and the network inbetween.

    Proper framing can be done by several operators out of the box with Akka Streams: https://doc.akka.io/docs/akka/current/stream/stream-io.html#using-framing-in-your-protocol

    For this particular case I think you see the result of an optimization in the Akka stream TCP implementation, which tries to avoid multiple system calls (they are expensive) by batching as many bytes as possible within a small timeframe before actually handing it off to the OS.