Search code examples
flume

Flume doesn't recover after memory transaction capacity is exceeded


I'm creating a proof-of-concept of a Flume agent that'll buffer events and stops consuming events from the source when the sink is unavailable. Only when the sink is available again, the buffered events should be processed and then the source restarts consumption.

For this I've created a simple agent, which reads from a SpoolDir and writes to a file. To simulate that the sink service is down, I change file permissions so Flume can't write to it. Then I start Flume some events are buffered in the memory channel and it stops consuming events when the channel capacity is full, as expected. As soon as the file becomes writeable, the sink is able to process the events and Flume recovers. However, that only works when the transaction capacity is not exceeded. As soon as the transaction capacity is exceeded, Flume never recovers and keeps writing the following error:

2015-10-02 14:52:51,940 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - 
org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to 
deliver event. Exception follows.
org.apache.flume.EventDeliveryException: Failed to process transaction
    at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:218)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)
Caused by: org.apache.flume.ChannelException: Take list for MemoryTransaction, 
capacity 4 full, consider committing more frequently, increasing capacity, or 
increasing thread count
    at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doTake(MemoryChannel.java:96)
    at org.apache.flume.channel.BasicTransactionSemantics.take(BasicTransactionSemantics.java:113)
    at org.apache.flume.channel.BasicChannelSemantics.take(BasicChannelSemantics.java:95)
    at org.apache.flume.sink.RollingFileSink.process(RollingFileSink.java:191)
    ... 3 more

As soon as the number of events buffered in memory exceed the transaction capacity (4) this error occurs. I don't understand why, because the batchSize of the fileout is 1, so it should take out the events one by one.

This is the config I'm using:

agent.sources = spool-src
agent.channels = mem-channel
agent.sinks = fileout

agent.sources.spool-src.channels = mem-channel
agent.sources.spool-src.type = spooldir
agent.sources.spool-src.spoolDir = /tmp/flume-spool
agent.sources.spool-src.batchSize = 1

agent.channels.mem-channel.type = memory
agent.channels.mem-channel.capacity = 10
agent.channels.mem-channel.transactionCapacity = 4

agent.sinks.fileout.channel = mem-channel
agent.sinks.fileout.type = file_roll
agent.sinks.fileout.sink.directory = /tmp/flume-output
agent.sinks.fileout.sink.rollInterval = 0
agent.sinks.fileout.batchSize = 1

I've tested this config with different values for the channel capacity & transaction capacity (e.g., 3 and 3), but haven't found a situation where the channel capacity is full and Flume is able to recover.


Solution

  • On the flume mailing list someone told me it was probably this bug that affected my proof of concept. The bug entails that the batch size is 100, even tho it's specified differently in the config. I re-ran the test with the source & sink batchSizes set to 100, the memory channel transactionCapacity set to 100 and its capacity to 300. With those values, the proof of concept works exactly as expected.