Search code examples
javamultithreadingapache-kafkaapache-kafka-streamsgrpc-java

Is ManagedChannel thread-safe in Transformer Kafka


Here is my transformer:

public class DataEnricher implements Transformer < byte[], EnrichedData, KeyValue < byte[], EnrichedData >> {

    private ManagedChannel channel;
    private InfoClient infoclient;
    private LRUCacheCollector < String,
    InfoResponse > cache;


    public DataEnricher() {}

    @Override
    public void init(ProcessorContext context) {
        channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();
        infoclient = new InfoClient(channel);
    }

    @Override
    public KeyValue < byte[],
    EnrichedData > transform(byte[] key, EnrichedData request) {
        InfoResponse infoResponse = null;
        String someInfo = request.getSomeInfo();
        try {
            infoResponse = infoclient.getMoreInfo(someInfo);
        } catch (Exception e) {
            logger.warn("An exception has occurred during retrieval.", e.getMessage());
        }
        EnrichedData enrichedData = EnrichedDataBuilder.addExtraInfo(request, infoResponse);
        return new KeyValue < > (key, enrichedData);
    }

    @Override
    public KeyValue < byte[],
    DataEnricher > punctuate(long timestamp) {
        return null;
    }

    @Override
    public void close() {
        client.shutdown();
    }
}

In Kafka Streams each stream thread initializes its own copy of the stream topology, and then instantiates that topology per ProcessorContext, i.e. per task, i.e. per-partition. So wouldn't init() get called and overwrite/leak the channel for each partition, and since we have multiple threads, even race the creation of the channel/client? Is there a way to prevent that?

this is called in the run() method:

public KafkaStreams createStreams() {
    final Properties streamsConfiguration = new Properties();
    //other configuration is setup here
    streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
    streamsConfiguration.put(
        StreamsConfig.NUM_STREAM_THREADS_CONFIG,
        3);

    StreamsBuilder streamsBuilder = new StreamsBuilder();

    RequestJsonSerde requestSerde = new RequestJsonSerde();
    DataEnricher dataEnricher = new DataEnricher();
    // Get the stream of requests
    final KStream < byte[], EnrichedData > requestsStream = streamsBuilder
        .stream(requestsTopic, Consumed.with(Serdes.ByteArray(), requestSerde));
    final KStream < byte[], EnrichedData > enrichedRequestsStream = requestsStream
        .filter((key, request) - > {
            return Objects.nonNull(request);
        }
        .transform(() - > dataEnricher);

    enrichedRequestsStream.to(enrichedRequestsTopic, Produced.with(Serdes.ByteArray()));

    return new KafkaStreams(streamsBuilder.build(), new StreamsConfig(streamsConfiguration));
}

Solution

  • Not related to ManagedChannel, but you have to supply new instant of DataEnricher per ProcessContext in TransformerSupplier.

    KStream.transform(DataEnricher::new);
    

    Once I run into some Kafka stream exceptions related to this, will try to recreate it.

    And IMO if you don't use punctuate to send more record to downstream and the new key is same as input record you should use transformValues() cause transform() might result in re-partitioning when a key-based operation like aggregation, join is applied.