Search code examples
flume

Flume configuration to upload files with same name


I have 10 files with some data varying in length.I would like to store corresponding data in same file and with same filename, but flume is splitting up the data and saving as FlumeData.timestamp. I am using the configuration as below:

 a1.sources = r1
a1.sinks =  k2
a1.channels = c1

a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /mnt/flume/checkpoint
a1.channels.c1.dataDirs = /mnt/flume/data
a1.channels.c1.trackerDir = /mnt/flume/track
a1.channels.c1.transactionCapacity = 10000000
a1.channels.c1.capacity = 500000000
a1.channels.c1.maxFileSize = 10000000
a1.channels.c1.useDualCheckpoints = true
a1.channels.c1.backupCheckpointDir = /mnt/flume/backup
a1.channels.c1.checkpointInterval = 2000
a1.channels.c1.minimumRequiredSpace = 9000000

a1.sources.r1.channels = c1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /usr/local/netlog/
a1.sources.r1.fileHeader = true
a1.sources.r1.bufferMaxLineLength = 500
a1.sources.r1.bufferMaxLines = 10000
a1.sources.r1.batchSize = 100000
#a1.sources.r1.deletePolicy = immediate

a1.sinks.k2.type = hdfs
a1.sinks.k2.channel = c1

a1.sinks.k2.hdfs.filePrefix = %{file}
a1.sinks.k2.hdfs.fileType = DataStream
a1.sinks.k2.hdfs.batchSize = 100000
a1.sinks.k2.hdfs.rollSize = 10000000
a1.sinks.k2.hdfs.rollInterval = 0
a1.sinks.k2.hdfs.rollSize = 0
a1.sinks.k2.hdfs.rollCount = 0
a1.sinks.k2.hdfs.idleTimeout = 0
a1.sinks.k2.hdfs.writeFormat = Text
a1.sinks.k2.hdfs.path = /user/flume

# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k2.channel = c1

Kindly suggest how i can store same 10 files with same filename and data within it. File size can vary from 2 MB to 15 MB.

The error i see in logs is

lib/native org.apache.flume.node.Application --conf-file conf/flume-conf.properties --name a1
2014-12-03 20:49:47,545 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting
2014-12-03 20:49:47,550 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:78)] Configuration provider started
2014-12-03 20:49:47,555 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:conf/flume-conf.properties for changes
2014-12-03 20:49:47,555 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:conf/flume-conf.properties
2014-12-03 20:49:47,571 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,571 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1020)] Created context for k2: hdfs.batchSize
2014-12-03 20:49:47,572 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,572 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,572 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,572 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,572 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,573 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,573 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,573 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: k2 Agent: a1
2014-12-03 20:49:47,573 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,573 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,575 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:k2
2014-12-03 20:49:47,576 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:313)] Starting validation of configuration for agent: a1, initial-configuration: AgentConfiguration[a1]
SOURCES: {r1={ parameters:{bufferMaxLineLength=500, channels=c1, spoolDir=/usr/local/netlog/, bufferMaxLines=10000, fileHeader=true, batchSize=100000, type=spooldir} }}
CHANNELS: {c1={ parameters:{trackerDir=/mnt/flume/track, maxFileSize=10000000, dataDirs=/mnt/flume/data, type=file, transactionCapacity=10000000, capacity=500000000, checkpointDir=/mnt/flume/checkpoint} }}
SINKS: {k2={ parameters:{hdfs.batchSize=100000, hdfs.idleTimeout=0, hdfs.filePrefix=%{file}, hdfs.path=/user/flume, hdfs.writeFormat=Text, hdfs.rollSize=0, hdfs.rollCount=0, channel=c1, hdfs.rollInterval=0, hdfs.fileType=DataStream, type=hdfs} }}

2014-12-03 20:49:47,583 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:468)] Created channel c1
2014-12-03 20:49:47,593 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:674)] Creating sink: k2 using HDFS
2014-12-03 20:49:47,596 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:371)] Post validation configuration for a1
AgentConfiguration created without Configuration stubs for which only basic syntactical validation was performed[a1]
SOURCES: {r1={ parameters:{bufferMaxLineLength=500, channels=c1, spoolDir=/usr/local/netlog/, bufferMaxLines=10000, fileHeader=true, batchSize=100000, type=spooldir} }}
CHANNELS: {c1={ parameters:{trackerDir=/mnt/flume/track, maxFileSize=10000000, dataDirs=/mnt/flume/data, type=file, transactionCapacity=10000000, capacity=500000000, checkpointDir=/mnt/flume/checkpoint} }}
SINKS: {k2={ parameters:{hdfs.batchSize=100000, hdfs.idleTimeout=0, hdfs.filePrefix=%{file}, hdfs.path=/user/flume, hdfs.writeFormat=Text, hdfs.rollSize=0, hdfs.rollCount=0, channel=c1, hdfs.rollInterval=0, hdfs.fileType=DataStream, type=hdfs} }}

2014-12-03 20:49:47,597 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:135)] Channels:c1

2014-12-03 20:49:47,597 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:136)] Sinks k2

2014-12-03 20:49:47,597 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:137)] Sources r1

2014-12-03 20:49:47,597 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [a1]
2014-12-03 20:49:47,598 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:150)] Creating channels
2014-12-03 20:49:47,629 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:40)] Creating instance of channel c1 type file
2014-12-03 20:49:47,635 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel c1
2014-12-03 20:49:47,636 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:39)] Creating instance of source r1, type spooldir
2014-12-03 20:49:47,654 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:40)] Creating instance of sink: k2, type: hdfs
2014-12-03 20:49:48,108 (conf-file-poller-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink.authenticate(HDFSEventSink.java:555)] Hadoop Security enabled: false
2014-12-03 20:49:48,111 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:119)] Channel c1 connected to [r1, k2]
2014-12-03 20:49:48,125 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{r1=EventDrivenSourceRunner: { source:Spool Directory source r1: { spoolDir: /usr/local/netlog/ } }} sinkRunners:{k2=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@1f87c88 counterGroup:{ name:null counters:{} } }} channels:{c1=FileChannel c1 { dataDirs: [/mnt/flume/data] }} }
2014-12-03 20:49:48,130 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel c1
2014-12-03 20:49:48,130 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.FileChannel.start(FileChannel.java:259)] Starting FileChannel c1 { dataDirs: [/mnt/flume/data] }...
2014-12-03 20:49:48,147 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.<init>(Log.java:328)] Encryption is not enabled
2014-12-03 20:49:48,149 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:373)] Replay started
2014-12-03 20:49:48,150 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.channel.file.Log.replay(Log.java:385)] Found NextFileID 0, from []
2014-12-03 20:49:48,155 (lifecycleSupervisor-1-0) [ERROR - org.apache.flume.channel.file.Log.replay(Log.java:481)] Failed to initialize Log on [channel=c1]
java.io.EOFException
    at java.io.RandomAccessFile.readInt(RandomAccessFile.java:786)
    at java.io.RandomAccessFile.readLong(RandomAccessFile.java:819)
    at org.apache.flume.channel.file.EventQueueBackingStoreFactory.get(EventQueueBackingStoreFactory.java:79)
    at org.apache.flume.channel.file.Log.replay(Log.java:417)
    at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:279)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
2014-12-03 20:49:48,160 (lifecycleSupervisor-1-0) [ERROR - org.apache.flume.channel.file.FileChannel.start(FileChannel.java:290)] Failed to start the file channel [channel=c1]
java.io.EOFException
    at java.io.RandomAccessFile.readInt(RandomAccessFile.java:786)
    at java.io.RandomAccessFile.readLong(RandomAccessFile.java:819)
    at org.apache.flume.channel.file.EventQueueBackingStoreFactory.get(EventQueueBackingStoreFactory.java:79)
    at org.apache.flume.channel.file.Log.replay(Log.java:417)
    at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:279)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
2014-12-03 20:49:48,162 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink k2
2014-12-03 20:49:48,163 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source r1
2014-12-03 20:49:48,163 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:77)] SpoolDirectorySource source starting with directory: /usr/local/netlog/
2014-12-03 20:49:48,185 (lifecycleSupervisor-1-3) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:132)] Initializing ReliableSpoolingFileEventReader with directory=/usr/local/netlog, metaDir=.flumespool, deserializer=LINE
2014-12-03 20:49:48,204 (lifecycleSupervisor-1-3) [DEBUG - org.apache.flume.client.avro.ReliableSpoolingFileEventReader.<init>(ReliableSpoolingFileEventReader.java:154)] Successfully created and deleted canary file: /usr/local/netlog/flume-spooldir-perm-check-5019906964160509405.canary
2014-12-03 20:49:48,218 (lifecycleSupervisor-1-3) [DEBUG - org.apache.flume.source.SpoolDirectorySource.start(SpoolDirectorySource.java:110)] SpoolDirectorySource source started
2014-12-03 20:49:48,343 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SOURCE, name: r1: Successfully registered new MBean.
2014-12-03 20:49:48,343 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SOURCE, name: r1 started
2014-12-03 20:49:48,344 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:119)] Monitored counter group for type: SINK, name: k2: Successfully registered new MBean.
2014-12-03 20:49:48,347 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:95)] Component type: SINK, name: k2 started
2014-12-03 20:49:48,356 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)] Polling sink runner starting
2014-12-03 20:49:48,357 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
java.lang.IllegalStateException: Channel closed [channel=c1]. Due to java.io.EOFException: null
    at org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:329)
    at org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:122)
    at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:376)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at java.io.RandomAccessFile.readInt(RandomAccessFile.java:786)
    at java.io.RandomAccessFile.readLong(RandomAccessFile.java:819)
    at org.apache.flume.channel.file.EventQueueBackingStoreFactory.get(EventQueueBackingStoreFactory.java:79)
    at org.apache.flume.channel.file.Log.replay(Log.java:417)
    at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:279)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more
2014-12-03 20:49:48,659 (pool-4-thread-1) [ERROR - org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:256)] FATAL: Spool Directory source r1: { spoolDir: /usr/local/netlog/ }: Uncaught exception in SpoolDirectorySource thread. Restart or reconfigure Flume to continue processing.
java.lang.IllegalStateException: Channel closed [channel=c1]. Due to java.io.EOFException: null
    at org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:329)
    at org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:122)
    at org.apache.flume.channel.ChannelProcessor.processEventBatch(ChannelProcessor.java:181)
    at org.apache.flume.source.SpoolDirectorySource$SpoolDirectoryRunnable.run(SpoolDirectorySource.java:235)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at java.io.RandomAccessFile.readInt(RandomAccessFile.java:786)
    at java.io.RandomAccessFile.readLong(RandomAccessFile.java:819)
    at org.apache.flume.channel.file.EventQueueBackingStoreFactory.get(EventQueueBackingStoreFactory.java:79)
    at org.apache.flume.channel.file.Log.replay(Log.java:417)
    at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:279)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    ... 7 more
2014-12-03 20:49:53,359 (SinkRunner-PollingRunner-DefaultSinkProcessor) [ERROR - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:160)] Unable to deliver event. Exception follows.
java.lang.IllegalStateException: Channel closed [channel=c1]. Due to java.io.EOFException: null
    at org.apache.flume.channel.file.FileChannel.createTransaction(FileChannel.java:329)
    at org.apache.flume.channel.BasicChannelSemantics.getTransaction(BasicChannelSemantics.java:122)
    at org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:376)
    at org.apache.flume.sink.DefaultSinkProcessor.process(DefaultSinkProcessor.java:68)
    at org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:147)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.EOFException
    at java.io.RandomAccessFile.readInt(RandomAccessFile.java:786)
    at java.io.RandomAccessFile.readLong(RandomAccessFile.java:819)
    at org.apache.flume.channel.file.EventQueueBackingStoreFactory.get(EventQueueBackingStoreFactory.java:79)
    at org.apache.flume.channel.file.Log.replay(Log.java:417)
    at org.apache.flume.channel.file.FileChannel.start(FileChannel.java:279)
    at org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

thanks in advance


Solution

  • ok, you need a few more props fro hdfs sink:

    a1.sinks.k2.hdfs.filePrefix = [your prefix]
    a1.sinks.k2.hdfs.fileSuffix = .[your suffix]
    

    suffix would be .tsv or .csv for instance, while prefix can be anything - you can also use variables for date and time - this requires that you use the timestamp interceptor. You can also create your own interceptor and generate your own variables into your file name. If you omit this, flume will add its own sequence between prefix and suffix.

    As an addition to our previous comments, the props to disable rollovers are the following:

    a1.sinks.k2.rollInterval = 0
    a1.sinks.k2.rollSize = 0
    a1.sinks.k2.rollCount = 0
    a1.sinks.k2.idleTimeout = 0
    

    to access the file name of the original file from your source, append the following in your hdfs sink config:

    a1.sinks.k2.hdfs.filePrefix = %{file}
    

    to simplify your channel config, do the following:

    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000