I am trying to configure flume with HDFS as sink.
this is my flume.conf file:
agent1.channels.ch1.type = memory
agent1.sources.avro-source1.channels = ch1
agent1.sources.avro-source1.type = avro
agent1.sources.avro-source1.bind = 0.0.0.0
agent1.sources.avro-source1.port = 41414
agent1.sinks.log-sink1.type = logger
agent1.sinks.hdfs-sink.channel=ch1
agent1.sinks.hdfs-sink.type=hdfs
agent1.sinks.hdfs-sink.hdfs.path=hdfs://localhost:9000/flume/flumehdfs/
agent1.sinks.hdfs-sink.hdfs.fileType = DataStream
agent1.sinks.hdfs-sink.hdfs.writeFormat = Text
agent1.sinks.hdfs-sink.hdfs.batchSize = 1000
agent1.sinks.hdfs-sink.hdfs.rollSize = 0
agent1.sinks.hdfs-sink.hdfs.rollCount = 10000
agent1.sinks.hdfs-sink.hdfs.rollInterval = 600
agent1.channels = ch1
agent1.sources = avro-source1
agent1.sinks = log-sink1 hdfs-sink
My hadoop version is:
Hadoop 0.20.2
Subversion https://svn.apache.org/repos/asf/hadoop/common/branches/branch-0.20 -r 911707
Flume version is :
apache-flume-1.4.0
I have put these two jar files in flume/lib directory
hadoop-0.20.2-core
hadoop-common-0.22.0
I put the hadoop-common jar there since I was getting the following error when starting flume agent:
Unhandled error
java.lang.NoSuchMethodError: org.apache.hadoop.security.UserGroupInformation.isSecurityEnabled()Z
at org.apache.flume.sink.hdfs.HDFSEventSink.authenticate(HDFSEventSink.java:491)
at org.apache.flume.sink.hdfs.HDFSEventSink.configure(HDFSEventSink.java:240)
at org.apache.flume.conf.Configurables.configure(Configurables.java:41)
at org.apache.flume.node.AbstractConfigurationProvider.loadSinks(AbstractConfigurationProvider.java:418)
at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:103)
at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:140)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:351)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:178)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:165)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:267)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:679)
Now agent is starting. This is the startup log :
logger=DEBUG,console Info: Including Hadoop libraries found via (/home/user/Downloads/hadoop-0.20.2/bin/hadoop) for HDFS access Exception in thread "main" java.lang.NoClassDefFoundError: classpath Caused by: java.lang.ClassNotFoundException: classpath at java.net.URLClassLoader$1.run(URLClassLoader.java:217) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:205) at java.lang.ClassLoader.loadClass(ClassLoader.java:321) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) at java.lang.ClassLoader.loadClass(ClassLoader.java:266) Could not find the main class: classpath. Program will exit. + exec /usr/lib/jvm/default-java/bin/java -Xmx20m -Dflume.root.logger=DEBUG,console -cp '/home/user/Downloads/apache-flume-1.4.0-bin/conf:/home/user/Downloads/apache-flume-1.4.0-bin/lib/*' -Djava.library.path=:/home/user/Downloads/hadoop-0.20.2/bin/../lib/native/Linux-amd64-64 org.apache.flume.node.Application -n agent1 -f ./conf/flume.conf 2013-09-04 07:55:22,634 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)] Configuration provider starting 2013-09-04 07:55:22,639 (lifecycleSupervisor-1-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:78)] Configuration provider started 2013-09-04 07:55:22,640 (conf-file-poller-0) [DEBUG - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:126)] Checking file:./conf/flume.conf for changes 2013-09-04 07:55:22,642 (conf-file-poller-0) [INFO - org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)] Reloading configuration file:./conf/flume.conf 2013-09-04 07:55:22,648 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink 2013-09-04 07:55:22,648 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1020)] Created context for hdfs-sink: hdfs.fileType 2013-09-04 07:55:22,649 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:loggerSink 2013-09-04 07:55:22,650 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1020)] Created context for loggerSink: type 2013-09-04 07:55:22,650 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink 2013-09-04 07:55:22,650 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink 2013-09-04 07:55:22,650 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink 2013-09-04 07:55:22,650 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink 2013-09-04 07:55:22,651 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink 2013-09-04 07:55:22,651 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:log-sink1 2013-09-04 07:55:22,651 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1020)] Created context for log-sink1: type 2013-09-04 07:55:22,651 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: loggerSink Agent: agent 2013-09-04 07:55:22,654 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:930)] Added sinks: log-sink1 hdfs-sink Agent: agent1 2013-09-04 07:55:22,654 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink 2013-09-04 07:55:22,654 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink 2013-09-04 07:55:22,654 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:loggerSink 2013-09-04 07:55:22,654 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:hdfs-sink 2013-09-04 07:55:22,655 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1016)] Processing:log-sink1 2013-09-04 07:55:22,655 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:313)] Starting validation of configuration for agent: agent, initial-configuration: AgentConfiguration[agent] SOURCES: {seqGenSrc={ parameters:{channels=memoryChannel, type=seq} }} CHANNELS: {memoryChannel={ parameters:{capacity=100, type=memory} }} SINKS: {loggerSink={ parameters:{type=logger, channel=memoryChannel} }} 2013-09-04 07:55:22,661 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:468)] Created channel memoryChannel 2013-09-04 07:55:22,671 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:674)] Creating sink: loggerSink using LOGGER 2013-09-04 07:55:22,673 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:371)] Post validation configuration for agent AgentConfiguration created without Configuration stubs for which only basic syntactical validation was performed[agent] SOURCES: {seqGenSrc={ parameters:{channels=memoryChannel, type=seq} }} CHANNELS: {memoryChannel={ parameters:{capacity=100, type=memory} }} AgentConfiguration created with Configuration stubs for which full validation was performed[agent] SINKS: {loggerSink=ComponentConfiguration[loggerSink] CONFIG: CHANNEL:memoryChannel } 2013-09-04 07:55:22,673 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:135)] Channels:memoryChannel 2013-09-04 07:55:22,673 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:136)] Sinks loggerSink 2013-09-04 07:55:22,674 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:137)] Sources seqGenSrc 2013-09-04 07:55:22,674 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:313)] Starting validation of configuration for agent: agent1, initial-configuration: AgentConfiguration[agent1] SOURCES: {avro-source1={ parameters:{port=41414, channels=ch1, type=avro, bind=0.0.0.0} }} CHANNELS: {ch1={ parameters:{type=memory} }} SINKS: {hdfs-sink={ parameters:{hdfs.fileType=DataStream, hdfs.path=hdfs://localhost:9000/flume/flumehdfs/, hdfs.batchSize=1000, hdfs.rollInterval=600, hdfs.rollSize=0, hdfs.writeFormat=Text, type=hdfs, hdfs.rollCount=10000, channel=ch1} }, log-sink1={ parameters:{type=logger, channel=ch1} }} 2013-09-04 07:55:22,675 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateChannels(FlumeConfiguration.java:468)] Created channel ch1 2013-09-04 07:55:22,677 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:674)] Creating sink: hdfs-sink using HDFS 2013-09-04 07:55:22,678 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.validateSinks(FlumeConfiguration.java:674)] Creating sink: log-sink1 using LOGGER 2013-09-04 07:55:22,679 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.isValid(FlumeConfiguration.java:371)] Post validation configuration for agent1 AgentConfiguration created without Configuration stubs for which only basic syntactical validation was performed[agent1] SOURCES: {avro-source1={ parameters:{port=41414, channels=ch1, type=avro, bind=0.0.0.0} }} CHANNELS: {ch1={ parameters:{type=memory} }} SINKS: {hdfs-sink={ parameters:{hdfs.fileType=DataStream, hdfs.path=hdfs://localhost:9000/flume/flumehdfs/, hdfs.batchSize=1000, hdfs.rollInterval=600, hdfs.rollSize=0, hdfs.writeFormat=Text, type=hdfs, hdfs.rollCount=10000, channel=ch1} }} AgentConfiguration created with Configuration stubs for which full validation was performed[agent1] SINKS: {log-sink1=ComponentConfiguration[log-sink1] CONFIG: CHANNEL:ch1 } 2013-09-04 07:55:22,679 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:135)] Channels:ch1 2013-09-04 07:55:22,679 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:136)] Sinks hdfs-sink log-sink1 2013-09-04 07:55:22,679 (conf-file-poller-0) [DEBUG - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:137)] Sources avro-source1 2013-09-04 07:55:22,680 (conf-file-poller-0) [INFO - org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:140)] Post-validation flume configuration contains configuration for agents: [agent, agent1] 2013-09-04 07:55:22,680 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:150)] Creating channels 2013-09-04 07:55:22,691 (conf-file-poller-0) [INFO - org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:40)] Creating instance of channel ch1 type memory 2013-09-04 07:55:22,699 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:205)] Created channel ch1 2013-09-04 07:55:22,700 (conf-file-poller-0) [INFO - org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:39)] Creating instance of source avro-source1, type avro 2013-09-04 07:55:22,733 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:40)] Creating instance of sink: log-sink1, type: logger 2013-09-04 07:55:22,736 (conf-file-poller-0) [INFO - org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:40)] Creating instance of sink: hdfs-sink, type: hdfs 2013-09-04 07:55:22,985 (conf-file-poller-0) [INFO - org.apache.flume.sink.hdfs.HDFSEventSink.authenticate(HDFSEventSink.java:493)] Hadoop Security enabled: false 2013-09-04 07:55:22,989 (conf-file-poller-0) [INFO - org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:119)] Channel ch1 connected to [avro-source1, log-sink1, hdfs-sink] 2013-09-04 07:55:22,996 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:138)] Starting new configuration:{ sourceRunners:{avro-source1=EventDrivenSourceRunner: { source:Avro source avro-source1: { bindAddress: 0.0.0.0, port: 41414 } }} sinkRunners:{hdfs-sink=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@709446e4 counterGroup:{ name:null counters:{} } }, log-sink1=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@16ba5c7a counterGroup:{ name:null counters:{} } }} channels:{ch1=org.apache.flume.channel.MemoryChannel{name: ch1}} } 2013-09-04 07:55:23,011 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:145)] Starting Channel ch1 2013-09-04 07:55:23,064 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:110)] Monitoried counter group for type: CHANNEL, name: ch1, registered successfully. 2013-09-04 07:55:23,064 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:94)] Component type: CHANNEL, name: ch1 started 2013-09-04 07:55:23,065 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink hdfs-sink 2013-09-04 07:55:23,066 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:173)] Starting Sink log-sink1 2013-09-04 07:55:23,068 (conf-file-poller-0) [INFO - org.apache.flume.node.Application.startAllComponents(Application.java:184)] Starting Source avro-source1 2013-09-04 07:55:23,069 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:192)] Starting Avro source avro-source1: { bindAddress: 0.0.0.0, port: 41414 }... 2013-09-04 07:55:23,069 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:110)] Monitoried counter group for type: SINK, name: hdfs-sink, registered successfully. 2013-09-04 07:55:23,069 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:94)] Component type: SINK, name: hdfs-sink started 2013-09-04 07:55:23,078 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)] Polling sink runner starting 2013-09-04 07:55:23,079 (SinkRunner-PollingRunner-DefaultSinkProcessor) [DEBUG - org.apache.flume.SinkRunner$PollingRunner.run(SinkRunner.java:143)] Polling sink runner starting 2013-09-04 07:55:23,458 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:110)] Monitoried counter group for type: SOURCE, name: avro-source1, registered successfully. 2013-09-04 07:55:23,462 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:94)] Component type: SOURCE, name: avro-source1 started 2013-09-04 07:55:23,464 (lifecycleSupervisor-1-3) [INFO - org.apache.flume.source.AvroSource.start(AvroSource.java:217)] Avro source avro-source1 started.
But when ever some event is coming, the following error is coming in the flume logs and nothing is getting written to hdfs aswell.
ERROR - org.apache.flume.sink.hdfs.HDFSEventSink.process(HDFSEventSink.java:422)] process failed java.lang.NoSuchMethodError: org.apache.hadoop.util.Shell.getGROUPS_COMMAND()[Ljava/lang/String; at org.apache.hadoop.security.UnixUserGroupInformation.getUnixGroups(UnixUserGroupInformation.java:345) at org.apache.hadoop.security.UnixUserGroupInformation.login(UnixUserGroupInformation.java:264) at org.apache.hadoop.security.UnixUserGroupInformation.login(UnixUserGroupInformation.java:300) at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:192) at org.apache.hadoop.hdfs.DFSClient.(DFSClient.java:170) at org.apache.hadoop.hdfs.DistributedFileSystem.initialize(DistributedFileSystem.java:82) at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:1792) at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:76) at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:1826) at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:1808) at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:265) at org.apache.hadoop.fs.Path.getFileSystem(Path.java:190) at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:226) at org.apache.flume.sink.hdfs.BucketWriter$1.call(BucketWriter.java:220) at org.apache.flume.sink.hdfs.BucketWriter$8$1.run(BucketWriter.java:536) at org.apache.flume.sink.hdfs.BucketWriter.runPrivileged(BucketWriter.java:160) at org.apache.flume.sink.hdfs.BucketWriter.access$1000(BucketWriter.java:56) at org.apache.flume.sink.hdfs.BucketWriter$8.call(BucketWriter.java:533) at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334) at java.util.concurrent.FutureTask.run(FutureTask.java:166) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1146) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:679)
I am missing some configuration or jar file?
These are the jars I added to solve the problem:
hadoop-core-1.0.4.jar
commons-configuration-1.6.jar
commons-httpclient-3.0.1.jar
jets3t-0.6.1.jar
commons-codec-1.4.jar
Note: I didn't have to add any extra jar when using hadoop 1.2.1