I am using spring-kafka to implement the topology to convert lower-case to upper-case like this:
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// A new processor object is created here per record
sourceStream.process(() -> new CapitalCaseProcessor());
The processor is not a spring singleton bean and is declared as follows:
public class CapitalCaseProcessor implements Processor<String, String> {
private ProcessorContext context;
public void init(ProcessorContext context) {
this.context = context;
public void process(String key, String value) {
The above processor is a stateful and holds the state of processor context.
Now, what would happen if we convert the stateful CapitalCaseProcessor
to a spring singleton bean ?
public class CapitalCaseProcessor implements Processor<String, String> {
//Is the ProcessorContext going to have thread safety issue now?
private ProcessorContext context;
public void init(ProcessorContext context) {
this.context = context;
public void process(String key, String value) {
and try to inject it in the main topology as spring bean:
public class UppercaseTopologyProcessor {
@Autowired CapitalCaseProcessor capitalCaseProcessor;
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// A singleton spring bean processor is now used for all the records
sourceStream.process(() -> capitalCaseProcessor);
Is it going to cause thread safety issue with the CapitalCaseProcessor now as it contains processorContext
as a state?
Or is it better to declare it as a prototype bean like this as this?
public class UppercaseTopologyProcessor {
public CapitalCaseProcessor getCapitalCaseProcessor() {return null;}
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
// A singleton spring bean processor is now used for all the records
sourceStream.process(() -> getCapitalCaseProcessor());
Update: I essentially would like to know two things:
I just ran a test and, the processor context is NOT thread-safe, what makes the stream thread-safe is you use a ProcessorSupplier
(in your first example) to create a new processor instance for each record.
You must certainly not replace this with a Spring singleton.
Here is my test, using the MessagingTransformer
provided by Spring for Apache Kafka:
public class So66200448Application {
private static final Logger log = LoggerFactory.getLogger(So66200448Application.class);
public static void main(String[] args) {
SpringApplication.run(So66200448Application.class, args);
KStream<String, String> stream(StreamsBuilder sb) {
KStream<String, String> stream = sb.stream("so66200448");
stream.transform(() -> new MessagingTransformer(msg -> {
log.info(new String(msg.getHeaders().get("foo", byte[].class)));
return msg;
}, new MessagingMessageConverter()) {
public KeyValue transform(Object key, Object value) {
try {
catch (InterruptedException e) {
return super.transform(key, value);
return stream;
public NewTopic topic1() {
return TopicBuilder.name("so66200448").partitions(2).replicas(1).build();
public NewTopic topic2() {
return TopicBuilder.name("so66200448out").partitions(2).replicas(1).build();
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> {
Headers headers = new RecordHeaders();
headers.add(new RecordHeader("foo", "bar".getBytes()));
ProducerRecord<String, String> record = new ProducerRecord<>("so66200448", 0, null, "foo", headers);
headers.add(new RecordHeader("foo", "baz".getBytes()));
record = new ProducerRecord<>("so66200448", 1, null, "bar", headers);
@KafkaListener(id = "so66200448out", topics = "so66200448out")
public void listen(String in) {
2021-02-16 15:57:34.322 INFO 17133 --- [-StreamThread-1] com.example.demo.So66200448Application : bar
2021-02-16 15:57:34.322 INFO 17133 --- [-StreamThread-2] com.example.demo.So66200448Application : baz
Changing the supplier to return the same instance each time, definitely breaks it.
KStream<String, String> stream(StreamsBuilder sb) {
KStream<String, String> stream = sb.stream("so66200448");
MessagingTransformer transformer = new MessagingTransformer(msg -> {
log.info(new String(msg.getHeaders().get("foo", byte[].class)));
return msg;
}, new MessagingMessageConverter()) {
public KeyValue transform(Object key, Object value) {
try {
catch (InterruptedException e) {
return super.transform(key, value);
stream.transform(() -> transformer)
return stream;
2021-02-16 15:54:28.975 INFO 16406 --- [-StreamThread-1] com.example.demo.So66200448Application : baz
2021-02-16 15:54:28.975 INFO 16406 --- [-StreamThread-2] com.example.demo.So66200448Application : baz
So, streams relies on getting a new instance each time for thread-safety.