Search code examples
apache-kafkaspring-cloud-streamspring-cloud-stream-binder-kafka

Publish multiple messages in batch processing function with Spring cloud stream Kafka Binder


I am looking for an example to create a functional style processor using spring cloud stream kafka binder (without Kafka Streams) that can consume a batch of n messages from one topic and publish m messages to another topic (m < n). I have tried the following:

public Function<List<IncomingEvent>, List<Message<OutgoingEvent>>> aggregate() {
    return ((batch) -> {
        Map<Key, List<IncomingEvent>> resultsMap = batch.stream()
            .collect(Collectors.groupingBy(result -> IncomingEvent::key));
        List<Message<OutgoingEvent>> aggregationResults = new ArrayList<>();
        for (Map.Entry<Key,List<IncomingEvent>> resultGroup : resultsMap.entrySet()) {
            OutgoingEvent outgoingEvent = this.getOutgoingEvent(resultGroup.getKey(), resultGroup.getValue());
            aggregationResults.add(
                MessageBuilder.withPayload(outgoingEvent)
                .setHeader(KafkaHeaders.MESSAGE_KEY, outgoingEvent.getKey())
                .build() 
            );
        }
        return aggregationResults;
    });
}

However, this produces a single event with an array of messages. I tried changing the return type of the function from List<Message> to Flux<Message> and then returning Flux.fromIterable(aggregationResults), and that seems to be publishing multiple messages, but the messages seem to be serialized instances of Flux with properties of scanAvailable and prefetch rather than the actual Message. I could not find any example of achieving this on the web. Would be very helpful to see such an example.


Solution

  • I don't think that's supported; use Consumer<List<IncomingEvent>> and use the StreamBridge to publish the outbound messages.

    https://docs.spring.io/spring-cloud-stream/docs/3.1.2/reference/html/spring-cloud-stream.html#_sending_arbitrary_data_to_an_output_e_g_foreign_event_driven_sources

    EDIT

    It seems I was wrong; see https://github.com/spring-cloud/spring-cloud-stream/issues/2143

    This is a documentation request. The cloud stream framework supports an undocumented feature to publish a batch of messages using a method signature like public Function<Whatever, List<Message<POJO>>> myMethod(). This results in each message in the list being published individually by the binder.

    If it's not working for you, I suggest you comment on that issue.