Actually I have 2 questions, my first question is : How to make HDFS close file (example .123456789.tmp ) after the entire file was flushed by flume agent. In fact, the file never closed until, I force flume agent to stop. I beleive there is a method using the 4 parameters as follow:
hdfs.rollSize = 0
hdfs.rollCount =0
hdfs.rollInterval = 0
hdfs.batchsize = 1000000
Well, my second question is, my agent flume receives files from SFTP server, while I need to keep each file name in hdfs. It works fine with spooldir type, but not with SFTP !! is there a any ideas ?
My configuration file for flume agent as follow:
agent.sources = r1
agent.channels = c1
agent.sinks = k
configure ftp source
agent.sources.r1.type = org.keedio.flume.source.mra.source.Source
agent.sources.r1.client.source = sftp
agent.sources.r1.name.server = ip
agent.sources.r1.user = user
agent.sources.r1.password = secret
agent.sources.r1.port = 22
agent.sources.r1.knownHosts = ~/.ssh/known_hosts
agent.sources.r1.work.dir = /DATA/test/flumrFTP
agent.sources.r1.fileHeader = true
agent.sources.r1.basenameHeader = true
agent.sources.r1.inputCharset = ISO-8859-1
#agent.sources.r1.batchSize = 1000
agent.sources.r1.flushlines = true
configure sink s1
agent.sinks.k.type = hdfs
agent.sinks.k.hdfs.path = hdfs://hostname:8000/user/admin/DATA/import_flume/
agent.sinks.k.hdfs.filePrefix = %{basename}
agent.sinks.k.hdfs.rollCount = 0
agent.sinks.k.hdfs.rollInterval = 0
agent.sinks.k.hdfs.rollSize = 0
agent.sinks.k.hdfs.useLocalTimeStamp = true
agent.sinks.k.hdfs.batchsize = 1000000
agent.sinks.k.hdfs.fileType = DataStream
Use a channel which buffers events in memory
agent.channels.c1.type = memory
agent.channels.c1.capacity = 1000000
agent.channels.c1.transactionCapacity = 1000000
agent.sources.r1.channels = c1
agent.sinks.k.channel = c1
Try setting the variable
hdfs.rollInterval It's the number of seconds to wait before rolling current file
This setting closes the file after the number of seconds you set. I set mine at 200 seconds and I am loading smaller files