I have an existing Kafka topic and a flume agent that reads from there and writes to HDFS. I want to reconfigure my flume agent so it will move away from the existing setup; a Kafka Source, file Channel to HDFS Sink, to use a Kafka Channel.
I read in the cloudera documentation that it possible to achieve this by only using a Kafka Channel and HDFS sink (without a flume source).. (unless I have got the wrong end of the stick.) So I tried to create this configuration but it isn't working. It's not even starting the flume process on the box.
# Test
test.channels = kafka-channel
test.sinks = hdfs-sink
test.channels.kafka-channel.type =
org.apache.flume.channel.kafka.KafkaChannel
test.channels.kafka-channel.kafka.bootstrap.servers = localhost:9092
test.channels.kafka-channel.kafka.topic = test
test.channels.kafka-channel.parseAsFlumeEvent = false
test.sinks.hdfs-sink.channel = kafka-channel
test.sinks.hdfs-sink.type = hdfs
test.sinks.hdfs-sink.hdfs.path = hdfs://localhost:8082/data/test/
I'm using:
ps -ef | grep flume
only returns a process once I added a kafka-source, but this can't be right because doing this creates an infinite loop for any messages published onto the topic.Is it possible to only use a Kafka Channel and HDFS Sink or do I need to use a kafka-source but change some other configurations that will prevent the infinite loop of messages?
Kafka-source
-> kafka-channel
-> HDFS Sink
- This doesn't seem right to me.
After digging around a bit I noticed that Ambari didn't create any flume conf files for the specified agent. Ambari seems to only create/update the flume config if I specify test.sources = kafka-source
. Once I added this into the flume config (via ambari) the config was created on the box and the flume agent started successfully.
The final flume config looked like this:
test.sources=kafka-source
test.channels = kafka-channel
test.sinks = hdfs-sink
test.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel
test.channels.kafka-channel.kafka.bootstrap.servers = localhost:9092
test.channels.kafka-channel.kafka.topic = test
test.channels.kafka-channel.parseAsFlumeEvent = false
test.sinks.hdfs-sink.channel = kafka-channel
test.sinks.hdfs-sink.type = hdfs
test.sinks.hdfs-sink.hdfs.path = hdfs:///data/test
Notice I didn't set any of the properties on the source (this would cause the infinite loop issue i mentioned in my question), it just needs to be mentioned so Ambari creates the flume config and starts the agent.