Search code examples
hadoopload-balancingflume

How to configure Flume 1.x (flume-ng) in failover mode?


There is plenty of information available on configuring Flume (0,9x) nodes in failover mode in CDH3.

But the configuration format for Flume (1.x) configuration in CDH4 is completely different. How to configure Flume 1.x (flume-ng) in failover mode?


Solution

  • In flume-ng you can define group of so-called "sinks" (event consumers), which all connected to one channel, and specify "failover" policy for this group, so if one of the sinks fails, events will be redirected to another.

    Suppose we have two sinks - main_sink and backup_sink, and both configured to consume events from one channel and to deliver events to some destination. We set priority of main_sink greater than priority of backup_sink, so Flume will forward events from channel to main_sink as long as it works. However, if main_sink fails, Flume will relegate it to failed sinks pool where it will be assigned a cool down period. Meanwhile events from channel will be forwarded to backup_sink.

    Full example may look like this:

    # channels
    agent.channels = mem_channel
    agent.channels.mem_channel.type = memory
    
    # sources
    agent.sources = event_source
    agent.sources.event_source.type = avro
    agent.sources.event_source.bind = 127.0.0.1
    agent.sources.event_source.port = 10000
    agent.sources.event_source.channels = mem_channel
    
    # sinks
    agent.sinks = main_sink backup_sink
    
    agent.sinks.main_sink.type = avro
    agent.sinks.main_sink.hostname = 127.0.0.1
    agent.sinks.main_sink.port = 10001
    agent.sinks.main_sink.channel = mem_channel
    
    agent.sinks.backup_sink.type = avro
    agent.sinks.backup_sink.hostname = 127.0.0.1
    agent.sinks.backup_sink.port = 10002
    agent.sinks.backup_sink.channel = mem_channel
    
    # sink groups    
    agent.sinkgroups = failover_group
    agent.sinkgroups.failover_group.sinks = main_sink backup_sink
    agent.sinkgroups.failover_group.processor.type = failover
    agent.sinkgroups.failover_group.processor.priority.main_sink = 10
    agent.sinkgroups.failover_group.processor.priority.backup_sink = 5
    

    You can find more details on the subject in Flume User Guide.