Search code examples
apache-kafka-streamsspring-kafka

Can record processor be spring singleton bean?


I am using spring-kafka to implement the topology to convert lower-case to upper-case like this:

        @Bean
    public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {

        KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

        // A new processor object is created here per record 
        sourceStream.process(() -> new CapitalCaseProcessor());
        ...
    }

The processor is not a spring singleton bean and is declared as follows:

public class CapitalCaseProcessor implements Processor<String, String> {

    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, String value) {
        context.headers().forEach(System.out::println);
    }

The above processor is a stateful and holds the state of processor context.

Now, what would happen if we convert the stateful CapitalCaseProcessor to a spring singleton bean ?

@Component
public class CapitalCaseProcessor implements Processor<String, String> {
    
    //Is the ProcessorContext going to have thread safety issue now?
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
        this.context = context;
    }

    @Override
    public void process(String key, String value) {
        context.headers().forEach(System.out::println);
    }

and try to inject it in the main topology as spring bean:

@Configuration
public class UppercaseTopologyProcessor {
       @Autowired CapitalCaseProcessor capitalCaseProcessor;
        @Bean
    public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {

        KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

        // A singleton spring bean processor is now used for all the records 
        sourceStream.process(() -> capitalCaseProcessor);
        ...
    }

Is it going to cause thread safety issue with the CapitalCaseProcessor now as it contains processorContext as a state?

Or is it better to declare it as a prototype bean like this as this?

@Configuration
public class UppercaseTopologyProcessor {
           
     @Lookup
     public CapitalCaseProcessor getCapitalCaseProcessor() {return null;}
        
     @Bean
    public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {

        KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

        // A singleton spring bean processor is now used for all the records 
        sourceStream.process(() -> getCapitalCaseProcessor());
        ...
    }

Update: I essentially would like to know two things:

  1. Should the processor instance be associated per stream record like AKKA actor model where actors are stateful and works per request or it can be a singleton object?
  2. Is ProcessorContext thread safe?

Solution

  • I just ran a test and, the processor context is NOT thread-safe, what makes the stream thread-safe is you use a ProcessorSupplier (in your first example) to create a new processor instance for each record.

    You must certainly not replace this with a Spring singleton.

    Here is my test, using the MessagingTransformer provided by Spring for Apache Kafka:

    @SpringBootApplication
    @EnableKafkaStreams
    public class So66200448Application {
    
    
        private static final Logger log = LoggerFactory.getLogger(So66200448Application.class);
    
    
        public static void main(String[] args) {
            SpringApplication.run(So66200448Application.class, args);
        }
    
        @Bean
        KStream<String, String> stream(StreamsBuilder sb) {
            KStream<String, String> stream = sb.stream("so66200448");
    
            stream.transform(() ->  new MessagingTransformer(msg -> {
                log.info(msg.toString());
                log.info(new String(msg.getHeaders().get("foo", byte[].class)));
                return msg;
            }, new MessagingMessageConverter()) {
    
                @Override
                public KeyValue transform(Object key, Object value) {
                    try {
                        Thread.sleep(5000);
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                    return super.transform(key, value);
                }
    
            })
                .to("so66200448out");
            return stream;
        }
    
        @Bean
        public NewTopic topic1() {
            return TopicBuilder.name("so66200448").partitions(2).replicas(1).build();
        }
    
        @Bean
        public NewTopic topic2() {
            return TopicBuilder.name("so66200448out").partitions(2).replicas(1).build();
        }
    
    
        @Bean
        public ApplicationRunner runner(KafkaTemplate<String, String> template) {
            return args -> {
                Headers headers = new RecordHeaders();
                headers.add(new RecordHeader("foo", "bar".getBytes()));
                ProducerRecord<String, String> record = new ProducerRecord<>("so66200448", 0, null, "foo", headers);
                template.send(record);
                headers.remove("foo");
                headers.add(new RecordHeader("foo", "baz".getBytes()));
                record = new ProducerRecord<>("so66200448", 1, null, "bar", headers);
                template.send(record);
            };
        }
    
        @KafkaListener(id = "so66200448out", topics = "so66200448out")
        public void listen(String in) {
            System.out.println(in);
        }
    
    }
    
    spring.kafka.streams.application-id=so66200448
    spring.kafka.streams.properties.num.stream.threads=2
    spring.kafka.consumer.auto-offset-reset=earliest
    

    2021-02-16 15:57:34.322 INFO 17133 --- [-StreamThread-1] com.example.demo.So66200448Application : bar

    2021-02-16 15:57:34.322 INFO 17133 --- [-StreamThread-2] com.example.demo.So66200448Application : baz

    Changing the supplier to return the same instance each time, definitely breaks it.

    @Bean
    KStream<String, String> stream(StreamsBuilder sb) {
        KStream<String, String> stream = sb.stream("so66200448");
        MessagingTransformer transformer = new MessagingTransformer(msg -> {
            log.info(msg.toString());
            log.info(new String(msg.getHeaders().get("foo", byte[].class)));
            return msg;
        }, new MessagingMessageConverter()) {
    
            @Override
            public KeyValue transform(Object key, Object value) {
                try {
                    Thread.sleep(5000);
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
                return super.transform(key, value);
            }
    
        };
        stream.transform(() ->  transformer)
            .to("so66200448out");
        return stream;
    }
    

    2021-02-16 15:54:28.975 INFO 16406 --- [-StreamThread-1] com.example.demo.So66200448Application : baz

    2021-02-16 15:54:28.975 INFO 16406 --- [-StreamThread-2] com.example.demo.So66200448Application : baz

    So, streams relies on getting a new instance each time for thread-safety.