Search code examples
csvhadoopflumeflume-ng

How do I copy a set of csv files from my local directory to HDFS using Flume


How do I copy a set of csv files from my local directory to HDFS using Flume? I tried using spool directory as my source, but failed to copy. Then I used the following flume configuration to get my result:

agent1.sources = tail 
agent1.channels = MemoryChannel-2 
agent1.sinks = HDFS 
agent1.sources.tail.type = exec 
agent1.sources.tail.command = tail -F /home/cloudera/runs/*  
agent1.sources.tail.channels = MemoryChannel-2 
agent1.sinks.HDFS.channel = MemoryChannel-2 
agent1.sinks.HDFS.type = hdfs 
agent1.sinks.HDFS.hdfs.path = hdfs://localhost:8020/user/cloudera/runs                         
agent1.sinks.HDFS.hdfs.file.Type = DataStream 
agent1.channels.MemoryChannel-2.type = memory 

I got my files copied to hdfs, but they contain special characters and will be of no use to me. My local directory is /home/cloudera/runs and my HDFS target directory is /user/cloudera/runs.


Solution

  • I used the below flume configuration to get the job done.
    
    #Flume Configuration Starts
    # Define a file channel called fileChannel on agent_slave_1
    agent_slave_1.channels.fileChannel1_1.type = file 
    # on linux FS
    agent_slave_1.channels.fileChannel1_1.capacity = 200000
    agent_slave_1.channels.fileChannel1_1.transactionCapacity = 1000
    # Define a source for agent_slave_1
    agent_slave_1.sources.source1_1.type = spooldir
    
    # on linux FS
    #Spooldir in my case is /home/cloudera/runs
    agent_slave_1.sources.source1_1.spoolDir = /home/cloudera/runs/
    agent_slave_1.sources.source1_1.fileHeader = false
    agent_slave_1.sources.source1_1.fileSuffix = .COMPLETED
    agent_slave_1.sinks.hdfs-sink1_1.type = hdfs
    
    #Sink is /user/cloudera/runs_scored under hdfs
    agent_slave_1.sinks.hdfs-sink1_1.hdfs.path = hdfs://localhost.localdomain:8020/user/cloudera/runs_scored/
    agent_slave_1.sinks.hdfs-sink1_1.hdfs.batchSize = 1000
    agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollSize = 268435456
    agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollInterval = 0
    agent_slave_1.sinks.hdfs-sink1_1.hdfs.rollCount = 50000000
    agent_slave_1.sinks.hdfs-sink1_1.hdfs.writeFormat=Text
    
    agent_slave_1.sinks.hdfs-sink1_1.hdfs.fileType = DataStream
    agent_slave_1.sources.source1_1.channels = fileChannel1_1
    agent_slave_1.sinks.hdfs-sink1_1.channel = fileChannel1_1
    
    agent_slave_1.sinks =  hdfs-sink1_1
    agent_slave_1.sources = source1_1
    agent_slave_1.channels = fileChannel1_1