Search code examples
apache-kafkainterceptorflume

Flume TAILDIR Source to Kafka Sink- Static Interceptor Issue


The scenario I'm trying to do is as follows:

1- Flume TAILDIR Source reading from a log file and appending a static interceptor to the beginning of the message. The interceptor consists of the host name and the host IP cause it's required with every log message I receive.

2- Flume Kafka Producer Sink that take those messages from the file and put them in a Kafka topic.

The Flume configuration is as follows:

tier1.sources=source1
tier1.channels=channel1
tier1.sinks =sink1

tier1.sources.source1.interceptors=i1


tier1.sources.source1.interceptors.i1.type=static
tier1.sources.source1.interceptors.i1.key=HostData
tier1.sources.source1.interceptors.i1.value=###HostName###000.00.0.000###


tier1.sources.source1.type=TAILDIR
tier1.sources.source1.positionFile=/usr/software/flumData/flumeStressAndKafkaFailureTestPos.json
tier1.sources.source1.filegroups=f1
tier1.sources.source1.filegroups.f1=/usr/software/flumData/flumeStressAndKafkaFailureTest.txt
tier1.sources.source1.channels=channel1

tier1.channels.channel1.type=file
tier1.channels.channel1.checkpointDir = /usr/software/flumData/checkpoint
tier1.channels.channel1.dataDirs = /usr/software/flumData/data



tier1.sinks.sink1.channel=channel1
tier1.sinks.sink1.type=org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.kafka.bootstrap.servers=<Removed For Confidentiality >
tier1.sinks.sink1.kafka.topic=FlumeTokafkaTest
tier1.sinks.sink1.kafka.flumeBatchSize=20
tier1.sinks.sink1.kafka.producer.acks=0
tier1.sinks.sink1.useFlumeEventFormat=true
tier1.sinks.sink1.kafka.producer.linger.ms=1
tier1.sinks.sink1.kafka.producer.client.id=HOSTNAME
tier1.sinks.sink1.kafka.producer.compression.type = snappy

So now I'm testing, I ran a Console Kafka Consumer and I started to write in the source file and I do receive the message with the header appended.

Example:

I write 'test' in the source file and press Enter then save the file

Flume detect the file change, then it sends the new line to Kafka producer.

My consumer get the following line:

###HostName###000.00.0.000###test

The issue now is that sometimes, the interceptor doesn't work as expected. It's like Flume sends 2 messages, one contains the interceptor and the other one the message content.

Example:

I write 'hi you' in the source file and press Enter then save the file

Flume detect the file change, then it sends the new line to Kafka producer.

My consumer get the following 2 line:

###HostName###000.00.0.000###
hi you

And the terminal scrolls to the the new message content.

This case always happen when I type 'hi you' in the text file, and since I read from a log file, then it's not predictable when it happens.

Help and support will be much appreciated ^^

Thank you


Solution

  • So the problem was from Kafka Consumer. It receives the full message from flume

    Interceptor + some garbage characters + message
    

    and if one of the garbage characters was \n (LF in Linux systems) then it assumes its 2 messages, not 1.

    I'm using Kafka Consumer element in Streamsets, so it's simple to change the message delimiter. I made it \r\n and now it's working fine.

    If you are dealing with the full message as a string and want to apply a regex on it or want to write it to a file, then it's better to replace \r and \n with an empty string.

    The full walkthrough to the answer can be found here:

    https://community.cloudera.com/t5/Data-Ingestion-Integration/Flume-TAILDIR-Source-to-Kafka-Sink-Static-Interceptor-Issue/m-p/86388#M3508