Search code examples
cloudera-cdhflume-ng

Flume memory chanel to HDFS sink


I'm facing an issue with Flume (1.5 on Cloudera CDH 5.3):

spoolDir source -> memory channel -> HDFS sink

What i'm trying to do: Every 5mins, about 20 files are pushed to the spooling directory (grabbed from a remote storage). Each files contains multiple lines, each line is a log (in JSON). File sizes are between 10KB and 1MB.

When I start the agent, all files are successfully pushed to HDFS. After 1 min (that's what I set in the flume.conf), files are rolled (remove the .tmp suffix and closed).

But, when new files are found in the spooling directory, I get the message:

org.apache.flume.source.SpoolDirectorySource: The channel is full, and cannot write data now. The source will try again after 250 milliseconds

After trying a lot of different configurations without success (increasing/decreasing channel transactionCapacity and capacity, increasing/decreasing batchSize, etc), I ask for your help.

Here is my latest flume configuration:

# source definition
sebanalytics.sources.spooldir-source.type = spooldir
sebanalytics.sources.spooldir-source.spoolDir = /var/flume/in
sebanalytics.sources.spooldir-source.basenameHeader = true
sebanalytics.sources.spooldir-source.basenameHeaderKey = basename
sebanalytics.sources.spooldir-source.batchSize = 10
sebanalytics.sources.spooldir-source.deletePolicy = immediate
# Max blob size: 1.5Go
sebanalytics.sources.spooldir-source.deserializer = org.apache.flume.sink.solr.morphline.BlobDeserializer$Builder
sebanalytics.sources.spooldir-source.deserializer.maxBlobLength = 1610000000
# Attach the interceptor to the source
sebanalytics.sources.spooldir-source.interceptors = json-interceptor
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.type = com.app.flume.interceptor.JsonInterceptor$Builder
# Define event's headers. basenameHeader must be the same than source.basenameHeaderKey (defaults is basename)
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.basenameHeader = basename
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.resourceHeader = resources
sebanalytics.sources.spooldir-source.interceptors.json-interceptor.ssidHeader = ssid

# channel definition
sebanalytics.channels.mem-channel-1.type = memory
sebanalytics.channels.mem-channel-1.capacity = 1000000
sebanalytics.channels.mem-channel-1.transactionCapacity = 10

# sink definition
sebanalytics.sinks.hdfs-sink-1.type = hdfs
sebanalytics.sinks.hdfs-sink-1.hdfs.path = hdfs://StandbyNameNode/data/in
sebanalytics.sinks.hdfs-sink-1.hdfs.filePrefix = %{resources}_%{ssid}
sebanalytics.sinks.hdfs-sink-1.hdfs.fileSuffix = .json
sebanalytics.sinks.hdfs-sink-1.hdfs.fileType = DataStream
sebanalytics.sinks.hdfs-sink-1.hdfs.writeFormat = Text
sebanalytics.sinks.hdfs-sink-1.hdfs.rollInterval = 3600
sebanalytics.sinks.hdfs-sink-1.hdfs.rollSize = 63000000
sebanalytics.sinks.hdfs-sink-1.hdfs.rollCount = 0
sebanalytics.sinks.hdfs-sink-1.hdfs.batchSize = 10
sebanalytics.sinks.hdfs-sink-1.hdfs.idleTimeout = 60

# connect source and sink to channel
sebanalytics.sources.spooldir-source.channels = mem-channel-1
sebanalytics.sinks.hdfs-sink-1.channel = mem-channel-1

Solution

  • A full channel means that: the channel is not able to received more events from the source since the sink consumes those events slower than the source.

    Increasing the channel capacity only dealys the problem. Possible solutions:

    • Improving the processing at the sink... if the sink is a custom one (improving/avoiding loops, using the more efficient backend's API, etc). In this case this seems to be not possible since you are using the default HDFS sink.
    • Reducing the frequency the data is sent to the source. Nevertheless, I guess you don't want to/cannot do this because of your processing requirements.
    • Adding more sinks working in parallel. I'm not sure about that, but I can imagine Flume's designers deciding to run each sink in a separated thread. If that's true, then you can try multiple parallel HDFS sinks. In order to split the data into several sinks, you will have to use a multiplexing selector different than the default replicating one.

    HTH!