Search code examples
hadooptwitterhdfsflume

Flume Twitter Streaming isssue


I'm trying to get some data from Twitter using Apache Flume and store then in HDFS, but i'm having some troubles

This is my flume-env.sh

export JAVA_HOME=/usr/lib/jvm/java-11-openjdk-amd64
$JAVA_OPTS="-Xms500m -Xmx1000m -Dcom.sun.management.jmxremote"
FLUME_CLASSPATH="/home/vineasouza/apache-flume-1.9.0-bin/lib/flume-sources-1.0-SNAPSHOT.jar" 

This is my twitter.conf

# Naming the components on the current agent. 
TwitterAgent.sources = Twitter 
TwitterAgent.channels = MemChannel 
TwitterAgent.sinks = HDFS

# Describing/Configuring the source 
TwitterAgent.sources.Twitter.type = org.apache.flume.source.twitter.TwitterSource
TwitterAgent.sources.Twitter.consumerKey = #
TwitterAgent.sources.Twitter.consumerSecret = # 
TwitterAgent.sources.Twitter.accessToken = #
TwitterAgent.sources.Twitter.accessTokenSecret = #
TwitterAgent.sources.Twitter.keywords = brasil

# Describing/Configuring the sink 
TwitterAgent.sinks.HDFS.channel=MemChannel
TwitterAgent.sinks.HDFS.type = hdfs 
TwitterAgent.sinks.HDFS.hdfs.path = hdfs://0.0.0.0:9000/user/Hadoop/twitter_data/
TwitterAgent.sinks.HDFS.hdfs.fileType = DataStream 
TwitterAgent.sinks.HDFS.hdfs.writeFormat = Text 
TwitterAgent.sinks.HDFS.hdfs.batchSize = 1000
TwitterAgent.sinks.HDFS.hdfs.rollSize = 0 
TwitterAgent.sinks.HDFS.hdfs.rollCount = 10000 
TwitterAgent.sinks.HDFS.hdfs.rollInterval = 600

# Describing/Configuring the channel 
TwitterAgent.channels.MemChannel.type = memory 
TwitterAgent.channels.MemChannel.capacity = 10000 
TwitterAgent.channels.MemChannel.transactionCapacity = 10000

# Binding the source and sink to the channel 
TwitterAgent.sources.Twitter.channels = MemChannel
TwitterAgent.sinks.HDFS.channel = MemChannel

And I'm running this command

$FLUME_HOME/bin/flume-ng agent --conf $FLUME_HOME/conf/ -f $FLUME_HOME/conf/twitter.conf -Dflume.root.logger=DEBUG,console -n TwitterAgent

But I'm having this Exception:

2020-06-18 16:22:55,992 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] User-Agent: twitter4j http://twitter4j.org/ /3.0.3
2020-06-18 16:22:55,992 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] Connection: close
2020-06-18 16:22:55,992 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] X-Twitter-Client-Version: 3.0.3
2020-06-18 16:22:55,992 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] X-Twitter-Client-URL: http://twitter4j.org/en/twitter4j-3.0.3.xml
2020-06-18 16:22:55,992 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] Accept-Encoding: gzip
2020-06-18 16:22:55,992 (Twitter Stream consumer-1[Establishing connection]) [DEBUG - twitter4j.internal.logging.SLF4JLogger.debug(SLF4JLogger.java:67)] X-Twitter-Client: Twitter4J
Exception in thread "Twitter Stream consumer-1[Establishing connection]" java.lang.OutOfMemoryError: Java heap space
    at java.base/java.io.BufferedInputStream.fill(BufferedInputStream.java:239)
    at java.base/java.io.BufferedInputStream.read1(BufferedInputStream.java:292)
    at java.base/java.io.BufferedInputStream.read(BufferedInputStream.java:351)
    at java.base/java.io.DataInputStream.readFully(DataInputStream.java:200)
    at java.base/java.io.DataInputStream.readUTF(DataInputStream.java:614)
    at java.base/java.io.DataInputStream.readUTF(DataInputStream.java:569)
    at java.base/sun.security.provider.JavaKeyStore.engineLoad(JavaKeyStore.java:740)
    at java.base/sun.security.util.KeyStoreDelegator.engineLoad(KeyStoreDelegator.java:222)
    at java.base/java.security.KeyStore.load(KeyStore.java:1479)
    at java.base/sun.security.util.AnchorCertificates$1.run(AnchorCertificates.java:62)
    at java.base/sun.security.util.AnchorCertificates$1.run(AnchorCertificates.java:53)
    at java.base/java.security.AccessController.doPrivileged(Native Method)
    at java.base/sun.security.util.AnchorCertificates.<clinit>(AnchorCertificates.java:53)
    at java.base/sun.security.provider.certpath.AlgorithmChecker.checkFingerprint(AlgorithmChecker.java:214)
    at java.base/sun.security.provider.certpath.AlgorithmChecker.<init>(AlgorithmChecker.java:164)
    at java.base/sun.security.provider.certpath.PKIXCertPathValidator.validate(PKIXCertPathValidator.java:181)
    at java.base/sun.security.provider.certpath.PKIXCertPathValidator.validate(PKIXCertPathValidator.java:145)
    at java.base/sun.security.provider.certpath.PKIXCertPathValidator.engineValidate(PKIXCertPathValidator.java:84)
    at java.base/java.security.cert.CertPathValidator.validate(CertPathValidator.java:309)
    at java.base/sun.security.validator.PKIXValidator.doValidate(PKIXValidator.java:364)
    at java.base/sun.security.validator.PKIXValidator.engineValidate(PKIXValidator.java:275)
    at java.base/sun.security.validator.Validator.validate(Validator.java:264)
    at java.base/sun.security.ssl.X509TrustManagerImpl.validate(X509TrustManagerImpl.java:313)
    at java.base/sun.security.ssl.X509TrustManagerImpl.checkTrusted(X509TrustManagerImpl.java:222)
    at java.base/sun.security.ssl.X509TrustManagerImpl.checkServerTrusted(X509TrustManagerImpl.java:129)
    at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.checkServerCerts(CertificateMessage.java:629)
    at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.onCertificate(CertificateMessage.java:464)
    at java.base/sun.security.ssl.CertificateMessage$T12CertificateConsumer.consume(CertificateMessage.java:360)
    at java.base/sun.security.ssl.SSLHandshake.consume(SSLHandshake.java:392)
    at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:444)
    at java.base/sun.security.ssl.HandshakeContext.dispatch(HandshakeContext.java:422)
    at java.base/sun.security.ssl.TransportContext.dispatch(TransportContext.java:183)

Anyone could help me? I tryed search solutions, but nothing solved my problem


Solution

  • Problem resolved modifying the conf/flume-env.sh file under flume:

    export JAVA_OPTS="-Xms512m -Xmx1024m -Dcom.sun.management.jmxremote"