Search code examples
hadoophdfstwitter4jflume

Flume Twitter Stream rolling small files in HDFS


I think I have tried every combination of altering my config file. I also saw somewhere that it might be due to my replication factor being 3 so I changed it to 1. I am using cloudera manager on AWS. Below is my config file, any ideas?

In HDFS, the file sizes are all under 20kb, trying to get at least 40-50mb. What is funny is that the same config file is writing ~60mb files on my virtual machine that I was practicing with (pre-installed hadoop + tools). See below for config file, any ideas?

# The configuration file needs to define the sources, 
# the channels and the sinks.
# Sources, channels and sinks are defined per agent, 
# in this case called 'TwitterAgent'

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

TwitterAgent.sources.Twitter.type = com.cloudera.flume.source.TwitterSource
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sources.Twitter.consumerKey = xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
TwitterAgent.sources.Twitter.consumerSecret = xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
TwitterAgent.sources.Twitter.accessToken = xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx 
TwitterAgent.sources.Twitter.accessTokenSecret = xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx
TwitterAgent.sources.Twitter.keywords = apple, grapes, fruits, strawberry, mango, pear
TwitterAgent.sinks.HDFS.channel = MemChannel
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://123.456.789.us-west-2.compute.amazonaws.com:8020/user/flume/tweets
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 0
TwitterAgent.sinks.HDFS.hdfs.batchSize = 100000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollCount = 0

TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 1000

Solution

  • So i finally figured out the issue. (note I am running a single node test cluster). One of the solutions in stackoverflow was to set the dfs.replication factor to 1 which I did but that did not solve the problem.

    For some reason what was happening was that in my flume agent, there was a mismatch in configs. The HDFS Sink has a parameter called minBlockReplicas, which informs it as to how many block replicas are necessary to have, and if not specified, it pulls that paramaneter from the default HDFS configuration file (which i thought I set to 1). It looks like it was getting a different value for dfs.replication or for dfs.namennode.replication.min.

    I circumvented the error my modifying my flume file directly by using

    TwitterAgent.sinks.HDFS.hdfs.minBlockReplicas = 1
    

    Hope this helps.