Search code examples
hadoopapache-kafkahadoop-streamingflume-nghortonworks-sandbox

Flume HDFS sink is not creating files in hdfs from Kafka channel


I am trying to implement a simple Flume HDFS sink which would get the events from the Kafka channel and write them to hdfs as textfile.

The architecture is very straight forward. The events are streamed from twitter and pipped to the kafka topic and flume hdfs sink will indeed write these events to hdfs. This is part-2 of the Kafka-producer stackoverflow question.

There are no ERRORs while I execute this command and seems like it's working just fine but I am not able to see the text file in hdfs. I am not able to debug or investigate as there are no log files created in /var/log/flume/ folder. I am using Hortonworks sandbox 2.3.1 and hue to browse the file system.


Command to execute flume: flume-ng agent -n KafkaSink -c conf -f tweets_sink_flume.properties

Flume properties file: tweets_sink_flume.properties

# agent name, in this case KafkaSink.
KafkaSink.sources  = kafka-source-1
KafkaSink.channels = hdfs-channel-1
KafkaSink.sinks    = hdfs-sink-1

# kafka source properties.
KafkaSink.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource
KafkaSink.sources.kafka-source-1.zookeeperConnect = sandbox.hortonworks.com:2181
KafkaSink.sources.kafka-source-1.topic = raw_json_tweets
KafkaSink.sources.kafka-source-1.batchSize = 100
KafkaSink.sources.kafka-source-1.channels = hdfs-channel-1

# hdfs flume sink properties
KafkaSink.channels.hdfs-channel-1.type   = memory
KafkaSink.sinks.hdfs-sink-1.channel = hdfs-channel-1
KafkaSink.sinks.hdfs-sink-1.type = hdfs
KafkaSink.sinks.hdfs-sink-1.hdfs.writeFormat = Text
KafkaSink.sinks.hdfs-sink-1.hdfs.fileType = DataStream
KafkaSink.sinks.hdfs-sink-1.hdfs.filePrefix = test-events
KafkaSink.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true

# created /user/ruben/flume/ folder in hdfs to avoid permission error issues
KafkaSink.sinks.hdfs-sink-1.hdfs.path = /user/ruben/flume/%{topic}/%y-%m-%d
KafkaSink.sinks.hdfs-sink-1.hdfs.rollCount=100
KafkaSink.sinks.hdfs-sink-1.hdfs.rollSize=0

# specify the capacity of the memory channel.
KafkaSink.channels.hdfs-channel-1.capacity = 10000
KafkaSink.channels.hdfs-channel-1.transactionCapacity = 1000

Here is part of the relevant Flume console output:

..
16/08/26 22:20:48 INFO kafka.KafkaSource: Kafka source kafka-source-1 started.
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], begin rebalancing consumer flume_sandbox.hortonworks.com-1472250048016-72a02e7f try #0
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] Stopping leader finder thread
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f-leader-finder-thread], Shutting down
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f-leader-finder-thread], Stopped
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f-leader-finder-thread], Shutdown completed
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] Stopping all fetchers
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] All connections stopped
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Cleared all relevant queues for this fetcher
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Cleared the data chunks in all the consumer message iterators
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Committing all offsets after clearing the fetcher queues
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Releasing partition ownership
16/08/26 22:36:38 INFO consumer.RangeAssignor: Consumer flume_sandbox.hortonworks.com-1472250048016-72a02e7f rebalancing the following partitions: ArrayBuffer(0) for topic raw_json_tweets with consumers: List(flume_sandbox.hortonworks.com-1472250048016-72a02e7f-0)
16/08/26 22:36:38 INFO consumer.RangeAssignor: flume_sandbox.hortonworks.com-1472250048016-72a02e7f-0 attempting to claim partition 0
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], flume_sandbox.hortonworks.com-1472250048016-72a02e7f-0 successfully owned partition 0 for topic raw_json_tweets
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], Consumer flume_sandbox.hortonworks.com-1472250048016-72a02e7f selected partitions : raw_json_tweets:0: fetched offset = -1: consumed offset = -1
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f], end rebalancing consumer flume_sandbox.hortonworks.com-1472250048016-72a02e7f try #0
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [flume_sandbox.hortonworks.com-1472250048016-72a02e7f-leader-finder-thread], Starting
16/08/26 22:36:38 INFO utils.VerifiableProperties: Verifying properties
16/08/26 22:36:38 INFO utils.VerifiableProperties: Property client.id is overridden to flume
16/08/26 22:36:38 INFO utils.VerifiableProperties: Property metadata.broker.list is overridden to sandbox.hortonworks.com:6667
16/08/26 22:36:38 INFO utils.VerifiableProperties: Property request.timeout.ms is overridden to 30000
16/08/26 22:36:38 INFO client.ClientUtils$: Fetching metadata from broker id:0,host:sandbox.hortonworks.com,port:6667 with correlation id 0 for 1 topic(s) Set(raw_json_tweets)
16/08/26 22:36:38 INFO producer.SyncProducer: Connected to sandbox.hortonworks.com:6667 for producing
16/08/26 22:36:38 INFO producer.SyncProducer: Disconnecting from sandbox.hortonworks.com:6667
16/08/26 22:36:38 INFO consumer.ConsumerFetcherThread: [ConsumerFetcherThread-flume_sandbox.hortonworks.com-1472250048016-72a02e7f-0-0], Starting
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] Added fetcher for partitions ArrayBuffer([[raw_json_tweets,0], initOffset -1 to broker id:0,host:sandbox.hortonworks.com,port:6667] )

Solution

  • Flume's Kafka source will, by default, only get messages that were written to Kafka after the agent with the source started - it will not consume the topic from the beginning.

    So either:

    1. Make sure you keep writing to that topic after Flume started
    2. Ask the Kafka source to start from beginning using readSmallestOffset=true configuration.