I'm processing messages from a Kafka topic with Samza. Some of the messages come with a timestamp in the future and I'd like to postpone the processing until after that timestamp. In the meantime, I'd like to keep processing other incoming messages.
What I tried to do is make my Task
queue the messages and implement the WindowableTask
to periodically check the messages if their timestamp allows to process them. The basic idea looks like this:
public class MyTask implements StreamTask, WindowableTask {
private HashSet<MyMessage> waitingMessages = new HashSet<>();
@Override
public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
byte[] message = (byte[]) incomingMessageEnvelope.getMessage();
MyMessage parsedMessage = MyMessage.parseFrom(message);
if (parsedMessage.getValidFromDateTime().isBeforeNow()) {
// Do the processing
} else {
waitingMessages.add(parsedMessage);
}
}
@Override
public void window(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
for (MyMessage message : waitingMessages) {
if (message.getValidFromDateTime().isBeforeNow()) {
// Do the processing and remove the message from the set
}
}
}
}
This obviously has some downsides. I'd be losing my waiting messages in memory when I redeploy my task. So I'd like to know the best practice for delaying the processing of messages with Samza. Do I need to reemit the messages to the same topic again and again until I can finally process them? We're talking about delaying the processing for a few minutes up to 1-2 hours here.
I think you could use key-value store of Samza to keep state of your task instance instead of in-memory Set
.
It should look something like:
public class MyTask implements StreamTask, WindowableTask, InitableTask {
private KeyValueStore<String, MyMessage> waitingMessages;
@SuppressWarnings("unchecked")
@Override
public void init(Config config, TaskContext context) throws Exception {
this.waitingMessages = (KeyValueStore<String, MyMessage>) context.getStore("messages-store");
}
@Override
public void process(IncomingMessageEnvelope incomingMessageEnvelope, MessageCollector messageCollector,
TaskCoordinator taskCoordinator) {
byte[] message = (byte[]) incomingMessageEnvelope.getMessage();
MyMessage parsedMessage = MyMessage.parseFrom(message);
if (parsedMessage.getValidFromDateTime().isBefore(LocalDate.now())) {
// Do the processing
} else {
waitingMessages.put(parsedMessage.getId(), parsedMessage);
}
}
@Override
public void window(MessageCollector messageCollector, TaskCoordinator taskCoordinator) {
KeyValueIterator<String, MyMessage> all = waitingMessages.all();
while(all.hasNext()) {
MyMessage message = all.next().getValue();
// Do the processing and remove the message from the set
}
}
}
If you redeploy you task Samza should recreate state of key-value store (Samza keeps values in special kafka topic related to key-value store). You need of course provide some extra configuration of your store (in above example for messages-store
).
You could read about key-value store here (for the latest Samza version): https://samza.apache.org/learn/documentation/0.14/container/state-management.html