Search code examples
apache-kafkaflumeflume-ng

How to use Flume's Kafka Channel without specifying a source


I have an existing Kafka topic and a flume agent that reads from there and writes to HDFS. I want to reconfigure my flume agent so it will move away from the existing setup; a Kafka Source, file Channel to HDFS Sink, to use a Kafka Channel.

I read in the cloudera documentation that it possible to achieve this by only using a Kafka Channel and HDFS sink (without a flume source).. (unless I have got the wrong end of the stick.) So I tried to create this configuration but it isn't working. It's not even starting the flume process on the box.

# Test
test.channels = kafka-channel
test.sinks = hdfs-sink

test.channels.kafka-channel.type = 
org.apache.flume.channel.kafka.KafkaChannel
test.channels.kafka-channel.kafka.bootstrap.servers = localhost:9092
test.channels.kafka-channel.kafka.topic = test
test.channels.kafka-channel.parseAsFlumeEvent = false

test.sinks.hdfs-sink.channel = kafka-channel
test.sinks.hdfs-sink.type = hdfs
test.sinks.hdfs-sink.hdfs.path = hdfs://localhost:8082/data/test/

I'm using:

  • HDP Quickstart VM 2.6.3
  • Flume version 1.5.2
  • The HDFS directory does exist
  • ps -ef | grep flume only returns a process once I added a kafka-source, but this can't be right because doing this creates an infinite loop for any messages published onto the topic.

Is it possible to only use a Kafka Channel and HDFS Sink or do I need to use a kafka-source but change some other configurations that will prevent the infinite loop of messages?

Kafka-source -> kafka-channel -> HDFS Sink - This doesn't seem right to me.


Solution

  • After digging around a bit I noticed that Ambari didn't create any flume conf files for the specified agent. Ambari seems to only create/update the flume config if I specify test.sources = kafka-source. Once I added this into the flume config (via ambari) the config was created on the box and the flume agent started successfully.

    The final flume config looked like this:

    test.sources=kafka-source
    test.channels = kafka-channel
    test.sinks = hdfs-sink
    
    test.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel
    test.channels.kafka-channel.kafka.bootstrap.servers = localhost:9092
    test.channels.kafka-channel.kafka.topic = test
    test.channels.kafka-channel.parseAsFlumeEvent = false
    
    test.sinks.hdfs-sink.channel = kafka-channel
    test.sinks.hdfs-sink.type = hdfs
    test.sinks.hdfs-sink.hdfs.path = hdfs:///data/test
    

    Notice I didn't set any of the properties on the source (this would cause the infinite loop issue i mentioned in my question), it just needs to be mentioned so Ambari creates the flume config and starts the agent.