Search code examples
javaapache-kafkaakka

Akka streams - why is the context not dropped?


I'm working on reading data from Kafka and have encountered this code to return the Kafka connection details but I don't understand how the context is shared. Here is the class to setup the KafkaConnection :

import akka.actor.typed.ActorSystem;
import akka.kafka.ConsumerMessage;
import akka.kafka.ConsumerSettings;
import akka.kafka.Subscriptions;
import akka.kafka.javadsl.Consumer;
import akka.stream.javadsl.SourceWithContext;
import lombok.Builder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.Deserializer;

import java.time.Duration;

@Getter
@Slf4j
public final class KafkaSource<K, V> {

    private final SourceWithContext<ConsumerRecord<K, V>, ConsumerMessage.Committable, ?> commitableSource;

    @Builder
    private KafkaSource(final Deserializer<K> keyd, final Deserializer<V> valueDeserializer, final ActorSystem actorSystem) {

        final String kafkaBootstrapServers = "localhost:9092";

        final ConsumerSettings<K, V> kafkaConsumerSettings =
                ConsumerSettings.create(actorSystem, keyd, valueDeserializer)
                        .withBootstrapServers(kafkaBootstrapServers)
                        .withGroupId("testGroup12")
                        .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
                        .withStopTimeout(Duration.ofSeconds(5));

        final String topics = "request-topic";

        this.commitableSource = Consumer.sourceWithOffsetContext(kafkaConsumerSettings,
                Subscriptions.topics(topics)).mapContext(ctx -> ctx);
    }
}

Here is the Akka Stream to process the data from Kafka I've written :

import akka.actor.typed.ActorSystem;
import akka.actor.typed.javadsl.Behaviors;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;

public class ReadFromKafka {

    private static ObjectMapper objectMapper = new ObjectMapper();

    public static void main(String args[]) {

        final ActorSystem actorSystem = ActorSystem.create(Behaviors.empty(), "as");

        var ksource = KafkaSource.<String, String>builder()
                .actorSystem(actorSystem)
                .keyd(new StringDeserializer()).valueDeserializer(new StringDeserializer())
                .build();

        ksource.getCommitableSource()
                .map(ConsumerRecord::value)
                .map(x -> {
                            var mappedObject = objectMapper.readValue(x, RequestDto.class);
                            System.out.println("mappedObject is :" + mappedObject);
                            return mappedObject;
                        }
                )
                .log("error")
                .asSource()
                .map(pair -> pair.second().commitInternal())
                .run(actorSystem);
    }
}

The class being mapped, RequestDto :

import com.fasterxml.jackson.annotation.JsonFormat;
import lombok.*;
import lombok.extern.jackson.Jacksonized;

import java.util.Date;

@Jacksonized
@AllArgsConstructor
@Getter
@Builder
@ToString
public class RequestDto {

    @JsonFormat(pattern = "yyyy-MM-dd HH:mm:sss")
    private final Date datePurchased;

    private String someOtherField;

}

Although ReadFromKafka works as expected why is ConsumerMessage.Committable from SourceWithContext<ConsumerRecord<K, V>, ConsumerMessage.Committable, ?> not dropped when executing :

.map(ConsumerRecord::value)
.map(x -> {
            var mappedObject = objectMapper.readValue(x, RequestDto.class);
            System.out.println("mappedObject is :" + mappedObject);
            return mappedObject;
        }
) 

.asSource() allows accessing the context within the Tuple at second position to then commit the offset using :

.map(pair -> pair.second().commitInternal())

I'm confused as to how this works, it appears something implicit is happening in the background that allows the context to be propagated throughout the stream?


Solution

  • A SourceWithContext<A, B, M> defines the stream operations which it supports to only work on the A part of the value.

    So if f is a function taking an A and returning a C, .map(f) results in a SourceWithContext<C, B, M>.

    Under the hood, it's a Source<Pair<A, B>, M>. map could be defined as something like (as always apologies for atrocious Java):

    private Source<Pair<A, B>, M> underlying;
    
    public <C> SourceWithContext<C, B, M> map(Function<A, C> f) {
        Source<Pair<C, B>, M> src =
            underlying.map(pair -> {
                A a = pair.first();
                C c = f(a);
                return Pair.of<C, B>(c, pair.second()); // no idea if this is the correct Java syntax, but you get the idea
            })
        return SourceWithContext.fromPairs<C, B, M>(src);
    }
    

    Note that the f never gets to see the second part of the Pair. So long as every operation does the right thing with respect to the context, it just works.

    There are operations where there's no unambiguous "right thing" to do. An example of this is an operation which could reorder elements.