Search code examples
flume

Flume stdout to IRC


I'm trying to consolidate the output of each node in a clustered application to an easy, at-a-glance location. I don't need the data to be stored permanently, I just want to see all of the stdout in the same spot. Eventually I'll want to store much less info, probably using log files, but for now, I just want app -> stdOut -> IRC, and flume seems to be a good choice for this.

All of the examples I have seen using the exec source show the command using tail, even though the docs make it seem like you can use any process that outputs to standard out. My config (see below) runs my application as the command, but for troubleshooting, it runs a simple shell script that echoes "test" at set intervals.

I've got everything running, and the IRC sink joins the IRC channel, but it never sends any messages. The last entry in the log is that Exec is starting.

Edit: flume version flume-ng-1.2.0+24.43-1~squeeze

flume.config:

agent.sources = exec1
agent.channels = mem1
agent.sinks = irc1
agent.sources.exec1.type = exec
agent.sources.exec1.command = sh /var/lib/app/test.sh
agent.sources.exec1.channels = mem1
agent.sinks.irc1.type = irc
agent.sinks.irc1.hostname = 192.168.17.16
agent.sinks.irc1.nick = flume
agent.sinks.irc1.chan = agents
agent.sinks.irc1.channel = mem1
agent.channels.mem1.type = memory
agent.channels.mem1.capacity = 100

log4j.properties:

flume.root.logger=INFO,LOGFILE
flume.log.dir=/var/log/flume-ng
flume.log.file=flume.log
log4j.logger.org.apache.flume.lifecycle = INFO
log4j.logger.org.jboss = WARN
log4j.logger.org.mortbay = INFO
log4j.logger.org.apache.avro.ipc.NettyTransceiver = WARN
log4j.rootLogger=${flume.root.logger}
log4j.appender.LOGFILE=org.apache.log4j.RollingFileAppender
log4j.appender.LOGFILE.MaxFileSize=100MB
log4j.appender.LOGFILE.MaxBackupIndex=10
log4j.appender.LOGFILE.File=${flume.log.dir}/${flume.log.file}
log4j.appender.LOGFILE.layout=org.apache.log4j.PatternLayout
log4j.appender.LOGFILE.layout.ConversionPattern=%d{ISO8601} %p %c{2}: %m%n
log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.target=System.err
log4j.appender.console.layout=org.apache.log4j.PatternLayout
log4j.appender.console.layout.ConversionPattern=%d (%t) [%p - %l] %m%n

test.sh:

#!/bin/bash
x=1
while [ $x -ge 1 ]
do
  echo "Test $x"
  x=$(( $x + 1 ))
  sleep 5
done

flume.log:

2013-01-31 12:45:08,184 INFO nodemanager.DefaultLogicalNodeManager: Node manager starting
2013-01-31 12:45:08,184 INFO properties.PropertiesFileConfigurationProvider: Configuration provider starting
2013-01-31 12:45:08,184 INFO lifecycle.LifecycleSupervisor: Starting lifecycle supervisor 9
2013-01-31 12:45:08,186 INFO properties.PropertiesFileConfigurationProvider: Reloading configuration file:/etc/flume-ng/conf/flume.conf
2013-01-31 12:45:08,194 INFO conf.FlumeConfiguration: Processing:irc1
2013-01-31 12:45:08,194 INFO conf.FlumeConfiguration: Added sinks: irc1 Agent: agent
2013-01-31 12:45:08,194 INFO conf.FlumeConfiguration: Processing:irc1
2013-01-31 12:45:08,194 INFO conf.FlumeConfiguration: Processing:irc1
2013-01-31 12:45:08,194 INFO conf.FlumeConfiguration: Processing:irc1
2013-01-31 12:45:08,194 INFO conf.FlumeConfiguration: Processing:irc1
2013-01-31 12:45:08,207 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration  for agents: [agent]
2013-01-31 12:45:08,208 INFO properties.PropertiesFileConfigurationProvider: Creating channels
2013-01-31 12:45:08,249 INFO instrumentation.MonitoredCounterGroup: Monitoried counter group for type: CHANNEL, name: mem1, registered successfully.
2013-01-31 12:45:08,249 INFO properties.PropertiesFileConfigurationProvider: created channel mem1
2013-01-31 12:45:08,262 INFO sink.DefaultSinkFactory: Creating instance of sink: irc1, type: irc
2013-01-31 12:45:08,266 INFO nodemanager.DefaultLogicalNodeManager: Starting new configuration:{ sourceRunners:{exec1=EventDrivenSourceRunner: { source:org.apache.flume.source.ExecSource@498665a0 }} sinkRunners:{irc1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@167a1116 counterGroup:{ name:null counters:{} } }} channels:{mem1=org.apache.flume.channel.MemoryChannel@27f7c6e1} }
2013-01-31 12:45:08,266 INFO nodemanager.DefaultLogicalNodeManager: Starting Channel mem1
2013-01-31 12:45:08,266 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: mem1 started
2013-01-31 12:45:08,266 INFO nodemanager.DefaultLogicalNodeManager: Starting Sink irc1
2013-01-31 12:45:08,267 INFO irc.IRCSink: IRC sink starting
2013-01-31 12:45:08,267 INFO nodemanager.DefaultLogicalNodeManager: Starting Source exec1
2013-01-31 12:45:08,267 INFO source.ExecSource: Exec source starting with command:sh /var/lib/app/test.sh

Edit batch size seems to have been the issue, since it was waiting until 20 messages (default?), which was 100 seconds until I saw any output. Now with batchsize = 1, a standard logger outputs results, but IRC is complaining about a NullPointerException, probably because Event.body is null somehow?


Solution

  • The docs for IRC sinks (found here: Flume 1.x User Guide) are wrong in saying that splitlines is not required to be configured. It does not have a default value in the code, so you must configure it.

    Looking at the source code (found here: IRCSink.java) you must also specify "splitlines" or suffer the NullPointerException. There is code to handle "splitchars" being null, but not splitlines. Reported as FLUME-1892 (Edit: This ticket was resolved in January. This should no longer be an issue.)