Search code examples
hadoophdfsflumeflume-ngflume-twitter

Unable to retrieve Twitter streaming data using Flume


I am trying to stream and retrieve Twitter data using Flume but unable to do so because of some sort of error. When I try executing it using the command:

flume-ng agent -n TwitterAgent -c conf -f /home/hadoop/Flume/conf/twitter.conf

I get the following:

Info: Including Hadoop libraries found via (/home/hadoop/hadoop-2.10.1/bin/hadoop) for HDFS access
Info: Including HBASE libraries found via (/home/hadoop/hbase-2.2.5/bin/hbase) for HBASE access
Info: Including Hive libraries found via (/home/hadoop/apache-hive-2.3.7-bin) for Hive access
+ exec /usr/lib/jvm/java-8-openjdk-amd64/bin/java -Xmx20m -cp 'conf:/home/hadoop/Flume/lib/*:/home/hadoop/hadoop-2.10.1/etc/hadoop:/home/hadoop/hadoop-2.10.1/share/hadoop/common/lib/*:/home/hadoop/hadoop-2.10.1/share/hadoop/common/*:/home/hadoop/hadoop-2.10.1/share/hadoop/hdfs:/home/hadoop/hadoop-2.10.1/share/hadoop/hdfs/lib/*:/home/hadoop/hadoop-2.10.1/share/hadoop/hdfs/*:/home/hadoop/hadoop-2.10.1/share/hadoop/yarn:/home/hadoop/hadoop-2.10.1/share/hadoop/yarn/lib/*:/home/hadoop/hadoop-2.10.1/share/hadoop/yarn/*:/home/hadoop/hadoop-2.10.1/share/hadoop/mapreduce/lib/*:/home/hadoop/hadoop-2.10.1/share/hadoop/mapreduce/*:/home/hadoop/hadoop-2.10.1/contrib/capacity-scheduler/*.jar:/home/hadoop/hbase-2.2.5/conf:/usr/lib/jvm/java-8-openjdk-amd64/lib/tools.jar:/home/hadoop/hbase-2.2.5:/home/hadoop/hbase-2.2.5/lib/shaded-clients/hbase-shaded-client-byo-hadoop-2.2.5.jar:/home/hadoop/hbase-2.2.5/lib/client-facing-thirdparty/audience-annotations-0.5.0.jar:/home/hadoop/hbase-2.2.5/lib/client-facing-thirdparty/commons-logging-1.2.jar:/home/hadoop/hbase-2.2.5/lib/client-facing-thirdparty/findbugs-annotations-1.3.9-1.jar:/home/hadoop/hbase-2.2.5/lib/client-facing-thirdparty/htrace-core4-4.2.0-incubating.jar:/home/hadoop/hbase-2.2.5/lib/client-facing-thirdparty/log4j-1.2.17.jar:/home/hadoop/hbase-2.2.5/lib/client-facing-thirdparty/slf4j-api-1.7.25.jar:/home/hadoop/hadoop-2.10.1/etc/hadoop:/home/hadoop/hadoop-2.10.1/share/hadoop/common/lib/*:/home/hadoop/hadoop-2.10.1/share/hadoop/common/*:/home/hadoop/hadoop-2.10.1/share/hadoop/hdfs:/home/hadoop/hadoop-2.10.1/share/hadoop/hdfs/lib/*:/home/hadoop/hadoop-2.10.1/share/hadoop/hdfs/*:/home/hadoop/hadoop-2.10.1/share/hadoop/yarn:/home/hadoop/hadoop-2.10.1/share/hadoop/yarn/lib/*:/home/hadoop/hadoop-2.10.1/share/hadoop/yarn/*:/home/hadoop/hadoop-2.10.1/share/hadoop/mapreduce/lib/*:/home/hadoop/hadoop-2.10.1/share/hadoop/mapreduce/*:/home/hadoop/hadoop-2.10.1/contrib/capacity-scheduler/*.jar:/home/hadoop/hbase-2.2.5/conf:/home/hadoop/apache-hive-2.3.7-bin/lib/*' -Djava.library.path=:/home/hadoop/hadoop-2.10.1/lib/native:/home/hadoop/hadoop-2.10.1/lib/native org.apache.flume.node.Application -n TwitterAgent -f /home/hadoop/Flume/conf/twitter.conf
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/home/hadoop/Flume/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hadoop/hadoop-2.10.1/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/home/hadoop/apache-hive-2.3.7-bin/lib/log4j-slf4j-impl-2.6.2.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
20/11/20 02:23:44 INFO node.PollingPropertiesFileConfigurationProvider: Configuration provider starting
20/11/20 02:23:44 INFO node.PollingPropertiesFileConfigurationProvider: Reloading configuration file:/home/hadoop/Flume/conf/twitter.conf
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Processing:HDFS
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Processing:MemChannel
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Processing:Twitter
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Processing:Twitter
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Processing:HDFS
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Processing:HDFS
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Processing:MemChannel
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Processing:HDFS
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Processing:HDFS
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Processing:MemChannel
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Processing:HDFS
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Added sinks: HDFS Agent: TwitterAgent
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Processing:Twitter
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Processing:Twitter
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Processing:HDFS
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Processing:HDFS
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Processing:Twitter
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Processing:Twitter
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Processing:Twitter
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Processing:HDFS
20/11/20 02:23:44 WARN conf.FlumeConfiguration: Agent configuration for 'TwitterAgent' has no configfilters.
20/11/20 02:23:44 INFO conf.FlumeConfiguration: Post-validation flume configuration contains configuration for agents: [TwitterAgent]
20/11/20 02:23:44 INFO node.AbstractConfigurationProvider: Creating channels
20/11/20 02:23:44 INFO channel.DefaultChannelFactory: Creating instance of channel MemChannel type memory
20/11/20 02:23:44 INFO node.AbstractConfigurationProvider: Created channel MemChannel
20/11/20 02:23:44 INFO source.DefaultSourceFactory: Creating instance of source Twitter, type org.apache.flume.source.twitter.TwitterSource
**20/11/20 02:23:44 ERROR node.AbstractConfigurationProvider: Source Twitter has been removed due to an error during configuration**
j**ava.lang.InstantiationException: Incompatible source and channel settings defined. source's batch size is greater than the channels transaction capacity. Source: Twitter, batch size = 1000, channel MemChannel, transaction capacity = 100**
    at org.apache.flume.node.AbstractConfigurationProvider.checkSourceChannelCompatibility(AbstractConfigurationProvider.java:386)
    at org.apache.flume.node.AbstractConfigurationProvider.getSourceChannels(AbstractConfigurationProvider.java:367)
    at org.apache.flume.node.AbstractConfigurationProvider.loadSources(AbstractConfigurationProvider.java:329)
    at org.apache.flume.node.AbstractConfigurationProvider.getConfiguration(AbstractConfigurationProvider.java:105)
    at org.apache.flume.node.PollingPropertiesFileConfigurationProvider$FileWatcherRunnable.run(PollingPropertiesFileConfigurationProvider.java:145)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
20/11/20 02:23:44 INFO sink.DefaultSinkFactory: Creating instance of sink: HDFS, type: hdfs
20/11/20 02:23:44 INFO node.AbstractConfigurationProvider: Channel MemChannel connected to [HDFS]
20/11/20 02:23:44 INFO node.Application: Starting new configuration:{ sourceRunners:{} sinkRunners:{HDFS=SinkRunner: { policy:org.apache.flume.sink.DefaultSinkProcessor@78e3d64e counterGroup:{ name:null counters:{} } }} channels:{MemChannel=org.apache.flume.channel.MemoryChannel{name: MemChannel}} }
20/11/20 02:23:44 INFO node.Application: Starting Channel MemChannel
20/11/20 02:23:44 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: CHANNEL, name: MemChannel: Successfully registered new MBean.
20/11/20 02:23:44 INFO instrumentation.MonitoredCounterGroup: Component type: CHANNEL, name: MemChannel started
20/11/20 02:23:44 INFO node.Application: Starting Sink HDFS
20/11/20 02:23:44 INFO instrumentation.MonitoredCounterGroup: Monitored counter group for type: SINK, name: HDFS: Successfully registered new MBean.
20/11/20 02:23:44 INFO instrumentation.MonitoredCounterGroup: Component type: SINK, name: HDFS started

The terminal just stays stuck here and nothing happens. I tried waiting for several minutes but it stays the same.

My config file twitter.conf is located at /home/hadoop/Flume/conf and is as follows:

#Naming the components on the current agent.
TwitterAgent.sources = Twitter
TwitterAgent.channels = MemChannel
TwitterAgent.sinks = HDFS

#Describing/Configuring the source
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey =##
TwitterAgent.sources.Twitter.consumerSecret =##
TwitterAgent.sources.Twitter.accessToken =##
TwitterAgent.sources.Twitter.accessTokenSecret =##
TwitterAgent.sources.Twitter.keywords =covid,covid-19,coronavirus

#Describing/Configuring the sink
TwitterAgent.sinks.HDFS.type = hdfs
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://localhost:9000/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text
TwitterAgent.sinks.HDFS.hdfs.batchSize = 10
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 600
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000

#Describing/Configuring the channel
TwitterAgent.channels.MemChannel.type = memory
TwitterAgent.channels.MemChannel.capacity = 100
TwitterAgent.channels.MemChannel.transactionCapacity = 100

#Binding the source and sink to the channel
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel

My flume-env.sh file is as follows:

#Licensed to the Apache Software Foundation (ASF) under one
#or more contributor license agreements.  See the NOTICE file
#distributed with this work for additional information
#regarding copyright ownership.  The ASF licenses this file
#to you under the Apache License, Version 2.0 (the
#"License"); you may not use this file except in compliance
#with the License.  You may obtain a copy of the License at

#http://www.apache.org/licenses/LICENSE-2.0
#Unless required by applicable law or agreed to in writing, software
#distributed under the License is distributed on an "AS IS" BASIS,
#WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#See the License for the specific language governing permissions and
#limitations under the License.

#If this file is placed at FLUME_CONF_DIR/flume-env.sh, it will be sourced
#during Flume startup.

#Enviroment variables can be set here.

export JAVA_HOME=/usr/lib/jvm/java-8-openjdk-amd64
export CLASSPATH=$CLASSPATH:/home/hadoop/Flume/lib/*
FLUME_CLASSPATH="/home/hadoop/Flume/lib/flume-sources-1.0-SNAPSHOT.jar"

#Give Flume more memory and pre-allocate, enable remote monitoring via JMX
#export JAVA_OPTS="-Xms100m -Xmx2000m -Dcom.sun.management.jmxremote"

#Let Flume write raw event data and configuration information to its log files for debugging
#purposes. Enabling these flags is not recommended in production,
#as it may result in logging sensitive user information or encryption secrets.
#export JAVA_OPTS="$JAVA_OPTS -Dorg.apache.flume.log.rawdata=true -Dorg.apache.flume.log.printconfig=true "

#Note that the Flume conf directory is always included in the classpath.
#FLUME_CLASSPATH=""

Solution

  • Update: Apparently after some research I found that I used a bad version of : flume-sources-1.0-SNAPSHOT.jar which is a jar file found in the lib folder of Flume. Fixed it by generating my own jar by following the method at: https://community.cloudera.com/t5/Support-Questions/issue-flume-twitter/m-p/22938#M6597