Search code examples
flume

Custom Sink for Flume-ng null event


I am trying to write a custom sink for flume-ng. I looked at the existing sinks and documentation and coded it up. However, the 'process()' method that's supposed to receive the events always ends up with null. I am doing Event event = channel.take(); but the event is null. I see in the logs that this method is called repeatedly as the event is still in the channel.

Can someone point me in the right direction?


Solution

  • This is the skeleton of a process function ...If you fail getting an event you rollback, change the status to BACKOFF . If not you commit and set status to READY . No matter what , you always close the transaction.

        Status status = null;
        Channel channel = getChannel();
        Transaction transaction = channel.getTransaction();
        transaction.begin();
        try {
            Event event = channel.take();
    
            if (event != null && validEvent(event.getBody()) >= 0) {
               # make some printing
            }
            transaction.commit();
            status = Status.READY;
        } catch (Throwable ex) {
            transaction.rollback();
            status = Status.BACKOFF;
            logger.error("Failed to deliver event. Exception follows.", ex);
            throw new EventDeliveryException("Failed to deliver event: " + ex);
        } finally {
            transaction.close();
        }
        return status;
    

    I am sure this gonna work :).