Search code examples
regexflume

Apache Flume. Regex extractor with multiplexing channel selector


I have following Flume config for grabbing server logs entries with particular numerical value and pushing them to corresponding kafka topics.

# Name the components on this agent
a1.sources = r1
a1.channels = c2 c3

# Describe/configure the source
a1.sources.r1.type = spooldir 
a1.sources.r1.spoolDir = /home/user/spoolFlume
a1.sources.r1.fileSuffix = .DONE
a1.sources.r1.basenameHeader = true
a1.sources.r1.deserializer.maxLineLength = 8192

a1.sources.r1.interceptors = i1 
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = (2725391)
a1.sources.r1.interceptors.i1.serializers = id 
a1.sources.r1.interceptors.i1.serializers.id.name = project_id
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = project_id 
a1.sources.r1.selector.mapping.2725391 = c3
a1.sources.r1.selector.default = c2


a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.brokerList=kafka10.profile:9092,kafka11.profile:9092,kafka12.profile:9092
a1.channels.c2.topic = flume_test_002
a1.channels.c2.zookeeperConnect = kafka10.profile:2181,kafka11.profile:2181,kafka12.profile:2181
#default = true
a1.channels.c2.parseAsFlumeEvent = true

a1.channels.c3.type = org.apache.flume.channel.kafka.KafkaChannel                                       
a1.channels.c3.brokerList = kafka10.profile:9092,kafka11.profile:9092,kafka12.profile:9092
a1.channels.c3.topic = flume_test_003
a1.channels.c3.zookeeperConnect = kafka10.profile:2181,kafka11.profile:2181,kafka12.profile:2181
a1.channels.c3.parseAsFlumeEvent = true

# Bind the source and sink to the channel
a1.sources.r1.channels = c2 c3

I did some test with more complicated regexp and all looks good with cat | grep -E <regexp>, but when I'm trying to use it in Flume config not all entries are captured.

Now I use one word regexp, but even with that not all entries are captured i.e not all 'right' entries go to kafka topic (for example I have 2 strings with '2725391' in log, but after processing I can see only one entry in kafka).

Seems something is wrong with Flume config. Any suggestions would be very appreciated.

Update 2. Even more - when I'm using short files (less then 100 strings) for parsing all works good. With files about 2GB I have missed entries.

Update 3. I found the way to parse all entries.

a1.sources.r1.decodeErrorPolicy = IGNORE

It helps because there is a strange symbol in header of parsed event in Kafka channel. I don't know where it comes from, since there are no such symbols inside raw logs before processing :/

basename00278388pid2725391�31.28.244.74

Solution

  • The solution was to set JAVA_HOME proper value and set following setting:

    a1.sources.r1.decodeErrorPolicy = IGNORE
    

    The source of a problem was in non UTF characters somewhere in logs.