Search code examples
hadoopflume

Writing data into flume and then to HDFS


I am using flume 1.5.0.1 and hadoop 2.4.1 trying to put a string into flume and save to HDFS. Flume configuration file is as follows:

    agentMe.channels = memory-channel
agentMe.sources = my-source AvroSource
agentMe.sinks = log-sink hdfs-sink

agentMe.sources.AvroSource.channels = memory-channel
agentMe.sources.AvroSource.type = avro
agentMe.sources.AvroSource.bind = 0.0.0.0 # i tried client ip as well
agentMe.sources.AvroSource.port = 41414

agentMe.channels.memory-channel.type = memory
agentMe.channels.memory-channel.capacity = 1000
agentMe.channels.memory-channel.transactionCapacity = 100

agentMe.sources.my-source.type = netcat
agentMe.sources.my-source.bind = 127.0.0.1 #If i use any other IP like the client from where the string is going to come from then i get unable to bind exception.
agentMe.sources.my-source.port = 9876
agentMe.sources.my-source.channels = memory-channel


# Define a sink that outputs to hdfs.
agentMe.sinks.hdfs-sink.channel = memory-channel
agentMe.sinks.hdfs-sink.type = hdfs
agentMe.sinks.hdfs-sink.hdfs.path = hdfs://localhost:54310/user/netlog/flume.txt
agentMe.sinks.hdfs-sink.hdfs.fileType = DataStream
agentMe.sinks.hdfs-sink.hdfs.batchSize = 2
agentMe.sinks.hdfs-sink.hdfs.rollCount = 0
agentMe.sinks.hdfs-sink.hdfs.inUsePrefix = tcptest-
agentMe.sinks.hdfs-sink.hdfs.inUseSuffix = .txt
agentMe.sinks.hdfs-sink.hdfs.rollSize = 0
agentMe.sinks.hdfs-sink.hdfs.rollInterval = 3
agentMe.sinks.hdfs-sink.hdfs.writeFormat = Text
agentMe.sinks.hdfs-sink.hdfs.path = /user/name/%y-%m-%d/%H%M/%S

I have already put the same question here

client.sendDataToFlume("hello world")

I see NettyAvroRpcClient not able to connect to the server where flume is running. But am just sending a simple string am i missing anything.

Experts kindly suggest


Solution

  • Configuration have to be correct otherwise things may not work out. So here is the config to read data into flume and then into HDFS.

    a1.sources = r1
    a1.sinks =  k2
    a1.channels = c1
    
    a1.channels.c1.type = memory
    
    a1.sources.r1.channels = c1
    a1.sources.r1.type = avro
    a1.sources.r1.bind = 0.0.0.0
    a1.sources.r1.port = 41414
    a1.sources.r1.interceptors = a
    a1.sources.r1.interceptors.a.type = org.apache.flume.interceptor.TimestampInterceptor$Builder
    
    a1.sinks.k2.type = hdfs
    a1.sinks.k2.channel = c1
    a1.sinks.k2.hdfs.fileType = DataStream
    a1.sinks.k2.hdfs.batchSize = 10
    a1.sinks.k2.hdfs.rollCount = 10
    a1.sinks.k2.hdfs.rollSize = 10
    a1.sinks.k2.hdfs.rollInterval = 10
    a1.sinks.k2.hdfs.writeFormat = Text
    a1.sinks.k2.hdfs.path = /user/flume/%y-%m-%d/%H%M/
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k2.channel = c1
    

    this should help :)