Search code examples
javaapache-kafkaapache-stormkafka-consumer-api

Storm read data from Kafka in java


I wrote a producer Kafka read data from MySQL and a consumer Kafka Kafka retrieve data from producer.

It works well. Here is my code: [Kafka producer]

import java.util.Properties;
import java.sql.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class ProducerTest {
public static void main(String[] args) throws ClassNotFoundException, SQLException
{
    Properties props = new Properties();
    props.put("zk.connect","localhost:2181");
    props.put("serializer.class","kafka.serializer.StringEncoder");
    props.put("metadata.broker.list","localhost:9092");
    ProducerConfig config = new ProducerConfig(props);
    Producer producer = new Producer(config);       
    try
    {
        Class.forName("com.mysql.jdbc.Driver");
        Connection con = DriverManager.getConnection(
                "jdbc:mysql://172.18.67.8:3306/big_data","root","root");
        Statement stmt = con.createStatement();
        ResultSet rs = stmt.executeQuery("select * from content_log");
        while(rs.next())
        {
            producer.send(new KeyedMessage("lamtest",rs.getString(1) + " " + rs.getString(2)+" "+rs.getString(3)+" "+rs.getString(4)+" "+rs.getString(5)+ " "+ rs.getString(6)
                    +" "+ rs.getString(7)
                    +" "+ rs.getString(8)
                    +" "+ rs.getString(9)
                    +" "+ rs.getString(10)
                    +" "+ rs.getString(11)
                    +" "+ rs.getString(12)
                    +" "+ rs.getString(13)
                    +" "+ rs.getString(14)
                    +" "+ rs.getString(15)
                    +" "+ rs.getString(17)
                    +" "+ rs.getString(18)
                    +" "+ rs.getString(19)
                    +" "+ rs.getString(21)
                    +" "+ rs.getString(22)
                    ));

        }
        con.close();
    }
    catch(Exception e)
    {
        System.out.println(e);
    }

   }
}

[Kafka consumer]

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class SimpleHLConsumer {

private final ConsumerConnector consumer;
private final String topic;

public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
    Properties props = new Properties();
    props.put("zookeeper.connect", zookeeper);
    props.put("group.id", groupId);
    props.put("zookeeper.session.timeout.ms", "500");
    props.put("zookeeper.sync.time.ms", "250");
    props.put("auto.commit.interval.ms", "1000");

    consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
    this.topic = topic;
}

public void testConsumer() {
    Map<String, Integer> topicCount = new HashMap<>();
    topicCount.put(topic, 1);

    Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
    List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
    for (final KafkaStream stream : streams) {
        ConsumerIterator<byte[], byte[]> it = stream.iterator();
        while (it.hasNext()) {
            System.out.println("Message from Single Topic: " + new String(it.next().message()));
        }
    }
    if (consumer != null) {
        consumer.shutdown();
    }
}

public static void main(String[] args) {
    String topic = "lamtest";
    SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("localhost:2181", "testgroup", topic);
    simpleHLConsumer.testConsumer();
}
}

Now, I want to integrate Kafka vs Storm. I want to read data from Kafka consumer to Storm spout. Please help me. Thanks.


Solution

  • There is storm-kafka library that gives storm spout for kafka.

    http://storm.apache.org/releases/1.0.1/storm-kafka.html

    https://mvnrepository.com/artifact/org.apache.storm/storm-kafka/1.0.1