In my processor API I store the messages in a key value store and every 100 messages I make a POST
request. If something fails while trying to send the messages (api is not responding etc.) I want to stop processing messages. Until there is evidence the API calls work.
Here is my code:
public class BulkProcessor implements Processor<byte[], UserEvent> {
private KeyValueStore<Integer, ArrayList<UserEvent>> keyValueStore;
private BulkAPIClient bulkClient;
private String storeName;
private ProcessorContext context;
private int count;
@Autowired
public BulkProcessor(String storeName, BulkClient bulkClient) {
this.storeName = storeName;
this.bulkClient = bulkClient;
}
@Override
public void init(ProcessorContext context) {
this.context = context;
keyValueStore = (KeyValueStore<Integer, ArrayList<UserEvent>>) context.getStateStore(storeName);
count = 0;
// to check every 15 minutes if there are any remainders in the store that are not sent yet
this.context.schedule(Duration.ofMinutes(15), PunctuationType.WALL_CLOCK_TIME, (timestamp) -> {
if (count > 0) {
sendEntriesFromStore();
}
});
}
@Override
public void process(byte[] key, UserEvent value) {
int userGroupId = Integer.valueOf(value.getUserGroupId());
ArrayList<UserEvent> userEventArrayList = keyValueStore.get(userGroupId);
if (userEventArrayList == null) {
userEventArrayList = new ArrayList<>();
}
userEventArrayList.add(value);
keyValueStore.put(userGroupId, userEventArrayList);
if (count == 100) {
sendEntriesFromStore();
}
}
private void sendEntriesFromStore() {
KeyValueIterator<Integer, ArrayList<UserEvent>> iterator = keyValueStore.all();
while (iterator.hasNext()) {
KeyValue<Integer, ArrayList<UserEvent>> entry = iterator.next();
BulkRequest bulkRequest = new BulkRequest(entry.key, entry.value);
if (bulkRequest.getLocation() != null) {
URI url = bulkClient.buildURIPath(bulkRequest);
try {
bulkClient.postRequestBulkApi(url, bulkRequest);
keyValueStore.delete(entry.key);
} catch (BulkApiException e) {
logger.warn(e.getMessage(), e.fillInStackTrace());
}
}
}
iterator.close();
count = 0;
}
@Override
public void close() {
}
}
Currently in my code if a call to the API fails it will iterate the next 100 (and this will keep happening as long as it fails) and add them to the keyValueStore
. I don't want this to happen. Instead I would prefer to stop the stream and continue once the keyValueStore
is emptied. Is that possible?
Could I throw a StreamsException
?
try {
bulkClient.postRequestBulkApi(url, bulkRequest);
keyValueStore.delete(entry.key);
} catch (BulkApiException e) {
throw new StreamsException(e);
}
Would that kill my stream app and so the process dies?
In the end I used a simple KafkaConsumer
instead of KafkaStreams
, but the bottom line was that I changed the BulkApiException
to extend RuntimeException
, which I throw again after I log it. So now it looks as follows:
} catch (BulkApiException bae) {
logger.error(bae.getMessage(), bae.fillInStackTrace());
throw new BulkApiException();
} finally {
consumer.close();
int exitCode = SpringApplication.exit(ctx, () -> 1);
System.exit(exitCode);
}
This way the application is exited and the k8s restarts the pod. That was because if the api where I'm trying to forward the requests is down, then there is no point on continue reading messages. So until the other api is back up k8s will restart a pod.