Search code examples
hadoopapache-kafkaflumeflume-ng

Flume agent: add host to message, then publish to a kafka topic


We started to consolidate eventlog data from our applications by publishing messages to a Kafka topic. Although we could write directly from the application to Kafka, we chose to treat it as a generic problem and use the Flume agent. This provides some flexibility: if we wanted to capture something else from a server, we could just tail a different source and publish to a different Kafka topic.

We created a Flume agent conf file to tail a log and publish to a Kafka topic:

tier1.sources  = source1
tier1.channels = channel1
tier1.sinks = sink1

tier1.sources.source1.type = exec
tier1.sources.source1.command = tail -F /var/log/some_log.log
tier1.sources.source1.channels = channel1

tier1.channels.channel1.type = memory
tier1.channels.channel1.capacity = 10000
tier1.channels.channel1.transactionCapacity = 1000

tier1.sinks.sink1.type = org.apache.flume.sink.kafka.KafkaSink
tier1.sinks.sink1.topic = some_log
tier1.sinks.sink1.brokerList = hadoop01:9092,hadoop02.com:9092,hadoop03.com:9092
tier1.sinks.sink1.channel = channel1
tier1.sinks.sink1.batchSize = 20

Unfortunately, the messages themselves don't specify the host that generated them. If we have an application running on multiple hosts and an error occurs, we have no way to figure out which host generated the message.

I notice that, if Flume wrote directly to HDFS, we could use a Flume interceptor to write to a specific HDFS location. Although we could probably do something similar with Kafka, i.e. create a new topic for each server, this could become unwieldy. We'd end up with thousands of topics.

Can Flume append/include the hostname of the originating host when it publishes to Kafka topic?


Solution

  • If you're using the exec source, nothing prevents you from running a smart command to prefix the hostname to the log file content.

    Note: if the command uses things like pipes, you'll also need to specify the shell like this:

    tier1.sources.source1.type = exec
    tier1.sources.source1.shell = /bin/sh -c
    tier1.sources.source1.command =  tail -F /var/log/auth.log | sed --unbuffered "s/^/$(hostname) /"
    

    The messages look like this:

    frb.hi.inet 2015-11-17 08:39:39.432 INFO [...]
    

    ... where frb.hi.inet us the name of my host.