Search code examples
excelcsvhadoophdfsflume

flume load csv files excels to hdfs sink


I have configured my Flume source to be of type Spooldir. I have a lot of CSV files, .xl3 and .xls, and I want my Flume agent to load all files from the spooldir to HDFS sink. However flume agent return exception

This is my config for flume source:

agent.sources.s1.type = spooldir
agent.sources.s1.spoolDir = /my-directory
agent.sources.s1.basenameHeader = true
agent.sources.batchSize = 10000

and my HDFS sink :

agent.sinks.sk1.type = hdfs 
agent.sinks.sk1.hdfs.path = hdfs://...:8020/user/importflume/%Y/%m/%d/%H 
agent.sinks.sk1.hdfs.filePrefix = %{basename}
agent.sinks.sk1.hdfs.rollSize = 0
agent.sinks.sk1.hdfs.rollCount = 0
agent.sinks.sk1.hdfs.useLocalTimeStamp = true
agent.sinks.sk1.hdfs.batchsize =    10000
agent.sinks.sk1.hdfs.fileType = DataStream
agent.sinks.sk1.serializer = avro_event
agent.sinks.sk1.serializer.compressionCodec = snappy

Solution

  • You can use the below configuration for spool dir. Just give the paths of your local file system and HDFS locations in the below configuration.

    #Flume Configuration Starts
    # Define a file channel called fileChannel on agent1
    agent1.channels.fileChannel1_1.type = file 
    # on linux FS
    agent1.channels.fileChannel1_1.capacity = 200000
    agent1.channels.fileChannel1_1.transactionCapacity = 1000
    # Define a source for agent1
    agent1.sources.source1_1.type = spooldir
    # on linux FS
    #Spooldir in my case is /home/hadoop/Desktop/flume_sink
    agent1.sources.source1_1.spoolDir = 'path'
    agent1.sources.source1_1.fileHeader = false
    agent1.sources.source1_1.fileSuffix = .COMPLETED
    agent1.sinks.hdfs-sink1_1.type = hdfs
    
    #Sink is /flume_import under hdfs
    
    agent1.sinks.hdfs-sink1_1.hdfs.path = hdfs://'path'
    agent1.sinks.hdfs-sink1_1.hdfs.batchSize = 1000
    agent1.sinks.hdfs-sink1_1.hdfs.rollSize = 268435456
    agent1.sinks.hdfs-sink1_1.hdfs.rollInterval = 0
    agent1.sinks.hdfs-sink1_1.hdfs.rollCount = 50000000
    agent1.sinks.hdfs-sink1_1.hdfs.writeFormat=Text
    
    agent1.sinks.hdfs-sink1_1.hdfs.fileType = DataStream
    agent1.sources.source1_1.channels = fileChannel1_1
    agent1.sinks.hdfs-sink1_1.channel = fileChannel1_1
    
    agent1.sinks =  hdfs-sink1_1
    agent1.sources = source1_1
    agent1.channels = fileChannel1_1
    

    You can also refer to this blog on Flume spool dir for more information.