Search code examples
hadoopflumeflume-ng

Flume HDFS sink only stores one line of data source using netcat source


I try to load data into HDFS using Flume 1.7. I created the following configuration:

# Starting with: /opt/flume/bin/flume-ng agent -n Agent -c conf -f /opt/flume/conf/test.conf
# Naming the components on the current agent
Agent.sources = Netcat   
Agent.channels = MemChannel 
Agent.sinks = LoggerSink hdfs-sink LocalOut

# Describing/Configuring the source 
Agent.sources.Netcat.type = netcat 
Agent.sources.Netcat.bind = 0.0.0.0
Agent.sources.Netcat.port = 56565  

# Describing/Configuring the sink 
Agent.sinks.LoggerSink.type = logger  

# Define a sink that outputs to hdfs.
Agent.sinks.hdfs-sink.type = hdfs
Agent.sinks.hdfs-sink.hdfs.path = hdfs://<<IP of HDFS node>>:8020/user/admin/flume_folder/%y-%m-%d/%H%M/
Agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
Agent.sinks.hdfs-sink.hdfs.fileType = DataStream
Agent.sinks.hdfs-sink.hdfs.writeFormat = Text
Agent.sinks.hdfs-sink.hdfs.batchSize = 100
Agent.sinks.hdfs-sink.hdfs.rollSize = 0
Agent.sinks.hdfs-sink.hdfs.rollCount = 0
Agent.sinks.hdfs-sink.hdfs.rollInterval = 0
Agent.sinks.hdfs-sink.hdfs.idleTimeout = 0

# Schreibt input into local Filesystem
#http://flume.apache.org/FlumeUserGuide.html#file-roll-sink
Agent.sinks.LocalOut.type = file_roll  
Agent.sinks.LocalOut.sink.directory = /tmp/flume
Agent.sinks.LocalOut.sink.rollInterval = 0  


# Describing/Configuring the channel 
Agent.channels.MemChannel.type = memory 
Agent.channels.MemChannel.capacity = 1000 
Agent.channels.MemChannel.transactionCapacity = 100 

# Bind the source and sink to the channel 
Agent.sources.Netcat.channels = MemChannel
Agent.sinks.LoggerSink.channel = MemChannel
Agent.sinks.hdfs-sink.channel = MemChannel
Agent.sinks.LocalOut.channel = MemChannel

After that I sent the following file using netcat to the source:

cat textfile.csv | nc <IP of flume agent> 56565

The file contain of the following elements:

Name1,1
Name2,2
Name3,3
Name4,4
Name5,5
Name6,6
Name7,7
Name8,8
Name9,9
Name10,10
Name11,11
Name12,12
Name13,13
Name14,14
Name15,15
Name16,16
Name17,17
Name18,18
Name19,19
Name20,20
...
Name490,490
Name491,491
Name492,492

The issue I'm facing is that without any errors flume is writing to hdfs but only one line of the transferred file. If you start to push the file multiple times to the source using nectat then sometimes flume is writing more than one file to hdfs including more than one row. But seldom all rows.

I tried to change the hdfs Parameters for rollSize, batch size and others but it doesn't change the behavior really.

The sink to local file which is also configured works just fine.

Did somebody know how to configure it to ensure that all entries are written to hdfs without loosing entries.

Thanks for your help.


Update 1.12.2016

I removed all sinks except the sink of HDFS and changed some of the parameters. After this the HDFS sinks performs as it should be.

Here the configuration:

# Naming the components on the current agent
Agent.sources = Netcat   
Agent.channels = MemChannel 
Agent.sinks = hdfs-sink 

# Describing/Configuring the source 
Agent.sources.Netcat.type = netcat 
Agent.sources.Netcat.bind = 0.0.0.0
Agent.sources.Netcat.port = 56565  


# Define a sink that outputs to hdfs.
Agent.sinks.hdfs-sink.type = hdfs
Agent.sinks.hdfs-sink.hdfs.path = hdfs://<<IP of HDFS node>>/user/admin/flume_folder/%y-%m-%d/%H%M/
Agent.sinks.hdfs-sink.hdfs.useLocalTimeStamp = true
Agent.sinks.hdfs-sink.hdfs.fileType = DataStream
Agent.sinks.hdfs-sink.hdfs.writeFormat = Text
Agent.sinks.hdfs-sink.hdfs.batchSize = 100
Agent.sinks.hdfs-sink.hdfs.rollSize = 0
Agent.sinks.hdfs-sink.hdfs.rollCount = 100


# Describing/Configuring the channel 
Agent.channels.MemChannel.type = memory 
Agent.channels.MemChannel.capacity = 1000 
Agent.channels.MemChannel.transactionCapacity = 100 

# Bind the source and sink to the channel 
Agent.sources.Netcat.channels = MemChannel
Agent.sinks.hdfs-sink.channel = MemChannel

Has somebody an idea why it is working with this configuration but with two or more sinks it does not work anymore?


Solution

  • I found the solution by my self. As I understand it, I used the same channel for both sinks. Therefore the sink which was faster take over all the entries and only some of the entries were passed to the hdfs sink.

    After using different channels and including fanning for the sources with the parameter

    Agent.sources.Netcat.selector.type = replicating
    

    Flume writes to local file and to hdfs as expected.