I have following Flume config for grabbing server logs entries with particular numerical value and pushing them to corresponding kafka topics.
# Name the components on this agent
a1.sources = r1
a1.channels = c2 c3
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/user/spoolFlume
a1.sources.r1.fileSuffix = .DONE
a1.sources.r1.basenameHeader = true
a1.sources.r1.deserializer.maxLineLength = 8192
a1.sources.r1.interceptors = i1
a1.sources.r1.interceptors.i1.type = regex_extractor
a1.sources.r1.interceptors.i1.regex = (2725391)
a1.sources.r1.interceptors.i1.serializers = id
a1.sources.r1.interceptors.i1.serializers.id.name = project_id
a1.sources.r1.selector.type = multiplexing
a1.sources.r1.selector.header = project_id
a1.sources.r1.selector.mapping.2725391 = c3
a1.sources.r1.selector.default = c2
a1.channels.c2.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c2.brokerList=kafka10.profile:9092,kafka11.profile:9092,kafka12.profile:9092
a1.channels.c2.topic = flume_test_002
a1.channels.c2.zookeeperConnect = kafka10.profile:2181,kafka11.profile:2181,kafka12.profile:2181
#default = true
a1.channels.c2.parseAsFlumeEvent = true
a1.channels.c3.type = org.apache.flume.channel.kafka.KafkaChannel
a1.channels.c3.brokerList = kafka10.profile:9092,kafka11.profile:9092,kafka12.profile:9092
a1.channels.c3.topic = flume_test_003
a1.channels.c3.zookeeperConnect = kafka10.profile:2181,kafka11.profile:2181,kafka12.profile:2181
a1.channels.c3.parseAsFlumeEvent = true
# Bind the source and sink to the channel
a1.sources.r1.channels = c2 c3
I did some test with more complicated regexp and all looks good with cat | grep -E <regexp>
, but when I'm trying to use it in Flume config not all entries are captured.
Now I use one word regexp, but even with that not all entries are captured i.e not all 'right' entries go to kafka topic (for example I have 2 strings with '2725391' in log, but after processing I can see only one entry in kafka).
Seems something is wrong with Flume config. Any suggestions would be very appreciated.
Update 2. Even more - when I'm using short files (less then 100 strings) for parsing all works good. With files about 2GB I have missed entries.
Update 3. I found the way to parse all entries.
a1.sources.r1.decodeErrorPolicy = IGNORE
It helps because there is a strange symbol in header of parsed event in Kafka channel. I don't know where it comes from, since there are no such symbols inside raw logs before processing :/
basename00278388pid2725391�31.28.244.74
The solution was to set JAVA_HOME proper value and set following setting:
a1.sources.r1.decodeErrorPolicy = IGNORE
The source of a problem was in non UTF characters somewhere in logs.