Here is my transformer:
public class DataEnricher implements Transformer < byte[], EnrichedData, KeyValue < byte[], EnrichedData >> {
private ManagedChannel channel;
private InfoClient infoclient;
private LRUCacheCollector < String,
InfoResponse > cache;
public DataEnricher() {}
@Override
public void init(ProcessorContext context) {
channel = ManagedChannelBuilder.forAddress("localhost", 50051).usePlaintext().build();
infoclient = new InfoClient(channel);
}
@Override
public KeyValue < byte[],
EnrichedData > transform(byte[] key, EnrichedData request) {
InfoResponse infoResponse = null;
String someInfo = request.getSomeInfo();
try {
infoResponse = infoclient.getMoreInfo(someInfo);
} catch (Exception e) {
logger.warn("An exception has occurred during retrieval.", e.getMessage());
}
EnrichedData enrichedData = EnrichedDataBuilder.addExtraInfo(request, infoResponse);
return new KeyValue < > (key, enrichedData);
}
@Override
public KeyValue < byte[],
DataEnricher > punctuate(long timestamp) {
return null;
}
@Override
public void close() {
client.shutdown();
}
}
In Kafka Streams each stream thread initializes its own copy of the stream topology, and then instantiates that topology per ProcessorContext, i.e. per task, i.e. per-partition. So wouldn't init()
get called and overwrite/leak the channel for each partition, and since we have multiple threads, even race the creation of the channel/client
? Is there a way to prevent that?
this is called in the run()
method:
public KafkaStreams createStreams() {
final Properties streamsConfiguration = new Properties();
//other configuration is setup here
streamsConfiguration.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class.getName());
streamsConfiguration.put(
StreamsConfig.NUM_STREAM_THREADS_CONFIG,
3);
StreamsBuilder streamsBuilder = new StreamsBuilder();
RequestJsonSerde requestSerde = new RequestJsonSerde();
DataEnricher dataEnricher = new DataEnricher();
// Get the stream of requests
final KStream < byte[], EnrichedData > requestsStream = streamsBuilder
.stream(requestsTopic, Consumed.with(Serdes.ByteArray(), requestSerde));
final KStream < byte[], EnrichedData > enrichedRequestsStream = requestsStream
.filter((key, request) - > {
return Objects.nonNull(request);
}
.transform(() - > dataEnricher);
enrichedRequestsStream.to(enrichedRequestsTopic, Produced.with(Serdes.ByteArray()));
return new KafkaStreams(streamsBuilder.build(), new StreamsConfig(streamsConfiguration));
}
Not related to ManagedChannel
, but you have to supply new instant of DataEnricher per ProcessContext
in TransformerSupplier
.
KStream.transform(DataEnricher::new);
Once I run into some Kafka stream exceptions related to this, will try to recreate it.
And IMO if you don't use punctuate to send more record to downstream and the new key is same as input record you should use transformValues()
cause transform()
might result in re-partitioning when a key-based operation like aggregation, join is applied.