Search code examples

Flume HDFS sink is not creating files in hdfs from Kafka channel

I am trying to implement a simple Flume HDFS sink which would get the events from the Kafka channel and write them to hdfs as textfile.

The architecture is very straight forward. The events are streamed from twitter and pipped to the kafka topic and flume hdfs sink will indeed write these events to hdfs. This is part-2 of the Kafka-producer stackoverflow question.

There are no ERRORs while I execute this command and seems like it's working just fine but I am not able to see the text file in hdfs. I am not able to debug or investigate as there are no log files created in /var/log/flume/ folder. I am using Hortonworks sandbox 2.3.1 and hue to browse the file system.

Command to execute flume: flume-ng agent -n KafkaSink -c conf -f

Flume properties file:

# agent name, in this case KafkaSink.
KafkaSink.sources  = kafka-source-1
KafkaSink.channels = hdfs-channel-1
KafkaSink.sinks    = hdfs-sink-1

# kafka source properties.
KafkaSink.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource
KafkaSink.sources.kafka-source-1.zookeeperConnect =
KafkaSink.sources.kafka-source-1.topic = raw_json_tweets
KafkaSink.sources.kafka-source-1.batchSize = 100
KafkaSink.sources.kafka-source-1.channels = hdfs-channel-1

# hdfs flume sink properties
KafkaSink.channels.hdfs-channel-1.type   = memory = hdfs-channel-1
KafkaSink.sinks.hdfs-sink-1.type = hdfs
KafkaSink.sinks.hdfs-sink-1.hdfs.writeFormat = Text
KafkaSink.sinks.hdfs-sink-1.hdfs.fileType = DataStream
KafkaSink.sinks.hdfs-sink-1.hdfs.filePrefix = test-events
KafkaSink.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true

# created /user/ruben/flume/ folder in hdfs to avoid permission error issues
KafkaSink.sinks.hdfs-sink-1.hdfs.path = /user/ruben/flume/%{topic}/%y-%m-%d

# specify the capacity of the memory channel.
KafkaSink.channels.hdfs-channel-1.capacity = 10000
KafkaSink.channels.hdfs-channel-1.transactionCapacity = 1000

Here is part of the relevant Flume console output:

16/08/26 22:20:48 INFO kafka.KafkaSource: Kafka source kafka-source-1 started.
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [], begin rebalancing consumer try #0
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] Stopping leader finder thread
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [], Shutting down
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [], Stopped
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [], Shutdown completed
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] Stopping all fetchers
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] All connections stopped
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [], Cleared all relevant queues for this fetcher
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [], Cleared the data chunks in all the consumer message iterators
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [], Committing all offsets after clearing the fetcher queues
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [], Releasing partition ownership
16/08/26 22:36:38 INFO consumer.RangeAssignor: Consumer rebalancing the following partitions: ArrayBuffer(0) for topic raw_json_tweets with consumers: List(
16/08/26 22:36:38 INFO consumer.RangeAssignor: attempting to claim partition 0
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [], successfully owned partition 0 for topic raw_json_tweets
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [], Consumer selected partitions : raw_json_tweets:0: fetched offset = -1: consumed offset = -1
16/08/26 22:36:38 INFO consumer.ZookeeperConsumerConnector: [], end rebalancing consumer try #0
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager$LeaderFinderThread: [], Starting
16/08/26 22:36:38 INFO utils.VerifiableProperties: Verifying properties
16/08/26 22:36:38 INFO utils.VerifiableProperties: Property is overridden to flume
16/08/26 22:36:38 INFO utils.VerifiableProperties: Property is overridden to
16/08/26 22:36:38 INFO utils.VerifiableProperties: Property is overridden to 30000
16/08/26 22:36:38 INFO client.ClientUtils$: Fetching metadata from broker id:0,,port:6667 with correlation id 0 for 1 topic(s) Set(raw_json_tweets)
16/08/26 22:36:38 INFO producer.SyncProducer: Connected to for producing
16/08/26 22:36:38 INFO producer.SyncProducer: Disconnecting from
16/08/26 22:36:38 INFO consumer.ConsumerFetcherThread: [], Starting
16/08/26 22:36:38 INFO consumer.ConsumerFetcherManager: [ConsumerFetcherManager-1472250048139] Added fetcher for partitions ArrayBuffer([[raw_json_tweets,0], initOffset -1 to broker id:0,,port:6667] )


  • Flume's Kafka source will, by default, only get messages that were written to Kafka after the agent with the source started - it will not consume the topic from the beginning.

    So either:

    1. Make sure you keep writing to that topic after Flume started
    2. Ask the Kafka source to start from beginning using readSmallestOffset=true configuration.