Search code examples
hadoopflumeapache-kafka

Using Kafka to import data to Hadoop


Firstly I was thinking what to use to get events into Hadoop, where they will be stored and periodically analysis would be performed on them (possibly using Ooozie to schedule periodic analysis) Kafka or Flume, and decided that Kafka is probably a better solution, since we also have a component that does event processing, so in this way, both batch and event processing components get data in the same way.

But know I'm looking for suggestions concretely how to get data out of broker to Hadoop.

I found here that Flume can be used in combination with Kafka

  • Flume - Contains Kafka Source (consumer) and Sink (producer)

And also found on the same page and in Kafka documentation that there is something called Camus

  • Camus - LinkedIn's Kafka=>HDFS pipeline. This one is used for all data at LinkedIn, and works great.

I'm interested in what would be a better (and easier, better documented solution) to do that? Also, are there any examples or tutorials how to do it?

When should I use this variants over simpler, High level consumer?

I'm opened for suggestions if there is another/better solution than this two.

Thanks


Solution

  • You can use flume to dump data from Kafka to HDFS. Flume has kafka source and sink. Its a matter of property file change. An example is given below.

    Steps:

    1. Create a kafka topic

      kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --     partitions 1 --topic testkafka
      
    2. Write to the above created topic using kafka console producer

      kafka-console-producer --broker-list localhost:9092 --topic testkafka
      
    3. Configure a flume agent with the following properties

      flume1.sources  = kafka-source-1
      flume1.channels = hdfs-channel-1
      flume1.sinks    = hdfs-sink-1
      flume1.sources.kafka-source-1.type = org.apache.flume.source.kafka.KafkaSource
      flume1.sources.kafka-source-1.zookeeperConnect = localhost:2181
      flume1.sources.kafka-source-1.topic =testkafka
      flume1.sources.kafka-source-1.batchSize = 100
      flume1.sources.kafka-source-1.channels = hdfs-channel-1
      
      flume1.channels.hdfs-channel-1.type   = memory
      flume1.sinks.hdfs-sink-1.channel = hdfs-channel-1
      flume1.sinks.hdfs-sink-1.type = hdfs
      flume1.sinks.hdfs-sink-1.hdfs.writeFormat = Text
      flume1.sinks.hdfs-sink-1.hdfs.fileType = DataStream
      flume1.sinks.hdfs-sink-1.hdfs.filePrefix = test-events
      flume1.sinks.hdfs-sink-1.hdfs.useLocalTimeStamp = true
      flume1.sinks.hdfs-sink-1.hdfs.path = /tmp/kafka/%{topic}/%y-%m-%d
      flume1.sinks.hdfs-sink-1.hdfs.rollCount=100
      flume1.sinks.hdfs-sink-1.hdfs.rollSize=0
      flume1.channels.hdfs-channel-1.capacity = 10000
      flume1.channels.hdfs-channel-1.transactionCapacity = 1000
      

    Save the above config file as example.conf

    1. Run the flume agent

      flume-ng agent -n flume1 -c conf -f example.conf -    Dflume.root.logger=INFO,console
      

    Data will be now dumped to HDFS location under the following path

    /tmp/kafka/%{topic}/%y-%m-%d