Search code examples
hadoopnullinterceptorclouderaflume

Drop null Flume events before sending to channel


I had read in a Flume book that if in the intercept method of interceptor an event is returned as null, the event will be dropped. Hence i have created a custom interceptor which on basis of a condition returns the event as null like:

public Event intercept(Event event) {
    // TODO Auto-generated method stub
    Event finalEvent = event;
    check = new String(event.getBody(),Charsets.UTF_8);

    if(check.matches("([0-9]-.+?-.+?-[0-9][0-9]+)")){

        try {
            fileWriter.append(new String(event.getBody(),Charsets.UTF_8)+ "\n");
        } catch (IOException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        finalEvent = null;
    }
    System.out.println("Event is : " + finalEvent);
    return finalEvent;
}

The interceptor emits null event but the file channel still passes it to the HDFS sink as empty. Why doesn't the event get dropped?? I am using Spooling directory as source.


Solution

  • In my interceptor class, the method intercept(Event event) contains the logic as to how the data flows(as written in the question) which returns null in case the regex is not satisfied, and in the method intercept(List<Event> events) the null events are excluded which servers the purpose. Following is the intercept(List<Event> events) code:

    public List<Event> intercept(List<Event> events) 
        {
              List<Event> interceptedEvents = new ArrayList<Event>(events.size());
              for (Event event : events) 
              {
                  // Intercept any event
                  Event interceptedEvent = intercept(event);
                  if(interceptedEvent!=null)
                      interceptedEvents.add(interceptedEvent);
              }
    
              return interceptedEvents;
        }