Search code examples
javatwitter4jflumeflume-ngflume-twitter

Running flume agent with custom source


I am trying to configure an flume agent with custom source from here, I tried to run flume agent using command

flume-ng agent --conf conf --conf-file conf/twitter1.conf --name TwitterAgent

But I am unable to start my Flume-agent, it shows error message like this,

The command prompt reply is

> Info: Sourcing environment configuration script
> /usr/local/lib/apache-flume-1.6.0-bin/conf/flume-env.sh Info:
> Including Hive libraries found via () for Hive access
> + exec /usr/lib/jvm/java-8-oracle/jre/bin/java -Xms500m -Xmx1000m -Dcom.sun.management.jmxremote -cp '/usr/local/lib/apache-flume-1.6.0-bin/conf:/usr/local/lib/apache-flume-1.6.0-bin/lib/*:/usr/local/lib/apache-flume-1.6.0-bin/plugins.d/Twittersource/lib/twitterstream.jar:/usr/local/lib/apache-flume-1.6.0-bin/lib/flume-sources-1.0-SNAPSHOT.jar:/usr/local/lib/apache-flume-1.6.0-bin/plugins.d/Twittersource/lib/*:/usr/local/lib/apache-flume-1.6.0-bin/plugins.d/Twittersource/libext/*:/lib/*'
> -Djava.library.path= org.apache.flume.node.Application --conf-file conf/twitter1.conf --name TwitterAgent 2015-07-13 17:43:51,355
> (lifecycleSupervisor-1-0) [INFO -
> org.apache.flume.node.PollingPropertiesFileConfigurationProvider.start(PollingPropertiesFileConfigurationProvider.java:61)]
> Configuration provider starting 2015-07-13 17:43:51,361
> (conf-file-poller-0) [INFO -
> org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:133)]
> Reloading configuration file:conf/twitter1.conf 2015-07-13
> 17:43:51,365 (conf-file-poller-0) [WARN -
> org.apache.flume.conf.FlumeConfiguration.<init>(FlumeConfiguration.java:102)]
> Configuration property ignored:   =  2015-07-13 17:43:51,369
> (conf-file-poller-0) [INFO -
> org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)]
> Processing:Twitsink 2015-07-13 17:43:51,369 (conf-file-poller-0) [INFO
> - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:1017)]
> Processing:Twitsink 2015-07-13 17:43:51,369 (conf-file-poller-0) [INFO
> - org.apache.flume.conf.FlumeConfiguration$AgentConfiguration.addProperty(FlumeConfiguration.java:931)]
> Added sinks: Twitsink Agent: TwitterAgent 2015-07-13 17:43:51,379
> (conf-file-poller-0) [INFO -
> org.apache.flume.conf.FlumeConfiguration.validateConfiguration(FlumeConfiguration.java:141)]
> Post-validation flume configuration contains configuration for agents:
> [TwitterAgent] 2015-07-13 17:43:51,380 (conf-file-poller-0) [INFO -
> org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:145)]
> Creating channels 2015-07-13 17:43:51,385 (conf-file-poller-0) [INFO -
> org.apache.flume.channel.DefaultChannelFactory.create(DefaultChannelFactory.java:42)]
> Creating instance of channel MemChannel type memory 2015-07-13
> 17:43:51,391 (conf-file-poller-0) [INFO -
> org.apache.flume.node.AbstractConfigurationProvider.loadChannels(AbstractConfigurationProvider.java:200)]
> Created channel MemChannel 2015-07-13 17:43:51,391
> (conf-file-poller-0) [INFO -
> org.apache.flume.source.DefaultSourceFactory.create(DefaultSourceFactory.java:41)]
> Creating instance of source Twitter, type
> com.qb.twitter.TwitterStreamsource 2015-07-13 17:43:51,444
> (conf-file-poller-0) [INFO -
> org.apache.flume.sink.DefaultSinkFactory.create(DefaultSinkFactory.java:42)]
> Creating instance of sink: Twitsink, type: logger 2015-07-13
> 17:43:51,448 (conf-file-poller-0) [INFO -
> org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:114)]
> Channel MemChannel connected to [Twitter, Twitsink] 2015-07-13
> 17:43:51,456 (conf-file-poller-0) [INFO -
> org.apache.flume.node.Application.startAllComponents(Application.java:138)]
> Starting new configuration:{
> sourceRunners:{Twitter=EventDrivenSourceRunner: {
> source:com.qb.twitter.TwitterStreamsource{name:Twitter,state:IDLE} }}
> sinkRunners:{Twitsink=SinkRunner: {
> policy:org.apache.flume.sink.DefaultSinkProcessor@385d25c0
> counterGroup:{ name:null counters:{} } }}
> channels:{MemChannel=org.apache.flume.channel.MemoryChannel{name:
> MemChannel}} } 2015-07-13 17:43:51,463 (conf-file-poller-0) [INFO -
> org.apache.flume.node.Application.startAllComponents(Application.java:145)]
> Starting Channel MemChannel 2015-07-13 17:43:51,466
> (lifecycleSupervisor-1-0) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.register(MonitoredCounterGroup.java:120)]
> Monitored counter group for type: CHANNEL, name: MemChannel:
> Successfully registered new MBean. 2015-07-13 17:43:51,468
> (lifecycleSupervisor-1-0) [INFO -
> org.apache.flume.instrumentation.MonitoredCounterGroup.start(MonitoredCounterGroup.java:96)]
> Component type: CHANNEL, name: MemChannel started 2015-07-13
> 17:43:51,468 (conf-file-poller-0) [INFO -
> org.apache.flume.node.Application.startAllComponents(Application.java:173)]
> Starting Sink Twitsink 2015-07-13 17:43:51,470 (conf-file-poller-0)
> [INFO -
> org.apache.flume.node.Application.startAllComponents(Application.java:184)]
> Starting Source Twitter 2015-07-13 17:43:51,472
> (lifecycleSupervisor-1-0)  [**ERROR** -
> **org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:253)]
> Unable to start EventDrivenSourceRunner: {
> source:com.qb.twitter.TwitterStreamsource{name:Twitter,state:IDLE} } -
> Exception follows.** java.lang.NoSuchMethodError:
> twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)V  at
> com.qb.twitter.TwitterStreamsource.start(TwitterStreamsource.java:83)
>   at
> org.apache.flume.source.EventDrivenSourceRunner.start(EventDrivenSourceRunner.java:44)
>   at
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:251)
>   at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
>   at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
>   at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
>   at
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
>   at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
>   at java.lang.Thread.run(Thread.java:745) 2015-07-13 17:43:51,476
> (lifecycleSupervisor-1-0) [WARN -
> org.apache.flume.lifecycle.LifecycleSupervisor$MonitorRunnable.run(LifecycleSupervisor.java:260)]
> Component EventDrivenSourceRunner: {
> source:com.qb.twitter.TwitterStreamsource{name:Twitter,state:STOP} }
> stopped, since it could not besuccessfully started due to missing
> dependencies

The configuration file is

TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = Twitsink 

TwitterAgent.sources.Twitter.type = com.qb.twitter.TwitterStreamsource
TwitterAgent.sources.Twitter.channels = MemChannel

TwitterAgent.sources.Twitter.consumerKey = **********
TwitterAgent.sources.Twitter.consumerSecret = ********
TwitterAgent.sources.Twitter.accessToken = ********
TwitterAgent.sources.Twitter.accessTokenSecret = ********
TwitterAgent.sources.Twitter.keywords = hadoop, big data

 
TwitterAgent.sinks.Twitsink.type=logger
TwitterAgent.sinks.Twitsink.channel = MemChannel

 
TwitterAgent.channels.MemChannel.type = memory   
TwitterAgent.channels.MemChannel.capacity = 10000
TwitterAgent.channels.MemChannel.transactionCapacity = 100

Solution

  • You're obviously providing the wrong twitter4j library:

    java.lang.NoSuchMethodError:

    twitter4j.TwitterStream.addListener(Ltwitter4j/StreamListener;)

    The current version of flume (1.6.0) is linked against twitter4j 3.0.3.

    make sure you don't have two versions of twitter4j in your lib directory.

    I do also assume that you know that Flume has it's own TwitterSource, which works quite well and you have your reasons for reimplementing your own version.