I am trying to write a custom sink for flume-ng. I looked at the existing sinks and documentation and coded it up. However, the 'process()' method that's supposed to receive the events always ends up with null. I am doing Event event = channel.take(); but the event is null. I see in the logs that this method is called repeatedly as the event is still in the channel.
Can someone point me in the right direction?
This is the skeleton of a process function ...If you fail getting an event you rollback, change the status to BACKOFF . If not you commit and set status to READY . No matter what , you always close the transaction.
Status status = null;
Channel channel = getChannel();
Transaction transaction = channel.getTransaction();
transaction.begin();
try {
Event event = channel.take();
if (event != null && validEvent(event.getBody()) >= 0) {
# make some printing
}
transaction.commit();
status = Status.READY;
} catch (Throwable ex) {
transaction.rollback();
status = Status.BACKOFF;
logger.error("Failed to deliver event. Exception follows.", ex);
throw new EventDeliveryException("Failed to deliver event: " + ex);
} finally {
transaction.close();
}
return status;
I am sure this gonna work :).