I am using spring-kafka to implement the topology to convert lower-case to upper-case like this:
@Bean
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// A new processor object is created here per record
sourceStream.process(() -> new CapitalCaseProcessor());
...
}
The processor is not a spring singleton bean and is declared as follows:
public class CapitalCaseProcessor implements Processor<String, String> {
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String value) {
context.headers().forEach(System.out::println);
}
The above processor is a stateful and holds the state of processor context.
Now, what would happen if we convert the stateful CapitalCaseProcessor
to a spring singleton bean ?
@Component
public class CapitalCaseProcessor implements Processor<String, String> {
//Is the ProcessorContext going to have thread safety issue now?
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String value) {
context.headers().forEach(System.out::println);
}
and try to inject it in the main topology as spring bean:
@Configuration
public class UppercaseTopologyProcessor {
@Autowired CapitalCaseProcessor capitalCaseProcessor;
@Bean
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// A singleton spring bean processor is now used for all the records
sourceStream.process(() -> capitalCaseProcessor);
...
}
Is it going to cause thread safety issue with the CapitalCaseProcessor now as it contains processorContext
as a state?
Or is it better to declare it as a prototype bean like this as this?
@Configuration
public class UppercaseTopologyProcessor {
@Lookup
public CapitalCaseProcessor getCapitalCaseProcessor() {return null;}
@Bean
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// A singleton spring bean processor is now used for all the records
sourceStream.process(() -> getCapitalCaseProcessor());
...
}
Update: I essentially would like to know two things:
I just ran a test and, the processor context is NOT thread-safe, what makes the stream thread-safe is you use a ProcessorSupplier
(in your first example) to create a new processor instance for each record.
You must certainly not replace this with a Spring singleton.
Here is my test, using the MessagingTransformer
provided by Spring for Apache Kafka:
@SpringBootApplication
@EnableKafkaStreams
public class So66200448Application {
private static final Logger log = LoggerFactory.getLogger(So66200448Application.class);
public static void main(String[] args) {
SpringApplication.run(So66200448Application.class, args);
}
@Bean
KStream<String, String> stream(StreamsBuilder sb) {
KStream<String, String> stream = sb.stream("so66200448");
stream.transform(() -> new MessagingTransformer(msg -> {
log.info(msg.toString());
log.info(new String(msg.getHeaders().get("foo", byte[].class)));
return msg;
}, new MessagingMessageConverter()) {
@Override
public KeyValue transform(Object key, Object value) {
try {
Thread.sleep(5000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return super.transform(key, value);
}
})
.to("so66200448out");
return stream;
}
@Bean
public NewTopic topic1() {
return TopicBuilder.name("so66200448").partitions(2).replicas(1).build();
}
@Bean
public NewTopic topic2() {
return TopicBuilder.name("so66200448out").partitions(2).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
Headers headers = new RecordHeaders();
headers.add(new RecordHeader("foo", "bar".getBytes()));
ProducerRecord<String, String> record = new ProducerRecord<>("so66200448", 0, null, "foo", headers);
template.send(record);
headers.remove("foo");
headers.add(new RecordHeader("foo", "baz".getBytes()));
record = new ProducerRecord<>("so66200448", 1, null, "bar", headers);
template.send(record);
};
}
@KafkaListener(id = "so66200448out", topics = "so66200448out")
public void listen(String in) {
System.out.println(in);
}
}
spring.kafka.streams.application-id=so66200448
spring.kafka.streams.properties.num.stream.threads=2
spring.kafka.consumer.auto-offset-reset=earliest
2021-02-16 15:57:34.322 INFO 17133 --- [-StreamThread-1] com.example.demo.So66200448Application : bar
2021-02-16 15:57:34.322 INFO 17133 --- [-StreamThread-2] com.example.demo.So66200448Application : baz
Changing the supplier to return the same instance each time, definitely breaks it.
@Bean
KStream<String, String> stream(StreamsBuilder sb) {
KStream<String, String> stream = sb.stream("so66200448");
MessagingTransformer transformer = new MessagingTransformer(msg -> {
log.info(msg.toString());
log.info(new String(msg.getHeaders().get("foo", byte[].class)));
return msg;
}, new MessagingMessageConverter()) {
@Override
public KeyValue transform(Object key, Object value) {
try {
Thread.sleep(5000);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
return super.transform(key, value);
}
};
stream.transform(() -> transformer)
.to("so66200448out");
return stream;
}
2021-02-16 15:54:28.975 INFO 16406 --- [-StreamThread-1] com.example.demo.So66200448Application : baz
2021-02-16 15:54:28.975 INFO 16406 --- [-StreamThread-2] com.example.demo.So66200448Application : baz
So, streams relies on getting a new instance each time for thread-safety.