Search code examples
javaspringapache-kafkaspring-integrationspring-xd

SpringXD and Spring Integration: Read from kafka topic every X minutes, then send to another Topic


I'm trying to implement a solution to create a SpringXD stream composed by a kafka source, a bridge module, and a kafka sink.

So I have something like:

<channel id="pollable">
    <queue />
</channel>

<bridge input-channel="pollable" output-channel="executorChannel">
    <poller max-messages-per-poll="5" fixed-rate="5000" />
</bridge>

My problem is that I would like to avoid the poller somehow. Basically because I would like to avoid keeping messages in memory when those messages are in the queue. I would prefer to read from kafka every X minutes and just take Y message from the queue and send those ones to the next topic.

Looks like I can't get rid of the queue, but then my question would be:Is there another option? I don't like to keep stuff in memory, but I wouldn't want to use this option either: http://docs.spring.io/spring-integration/reference/html/system-management-chapter.html#message-store


Solution

  • Keeping the data in memory is NOT a good idea.

    You can stop() and start() the channel adapter (KafkaMessageDrivenChannelAdapter) as needed; it will pick up where it left off when restarted.

    However, the kafka source uses a very old version of spring-integration-kafka (1.3.x).

    If you create a custom source to use spring-integration-kafka 2.1.0 (which uses the kafka 0.10.1.x client), you can set the kafka property max.poll.records to limit the number of records fetched.