I wrote a producer Kafka read data from MySQL and a consumer Kafka Kafka retrieve data from producer.
It works well. Here is my code: [Kafka producer]
import java.util.Properties;
import java.sql.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class ProducerTest {
public static void main(String[] args) throws ClassNotFoundException, SQLException
{
Properties props = new Properties();
props.put("zk.connect","localhost:2181");
props.put("serializer.class","kafka.serializer.StringEncoder");
props.put("metadata.broker.list","localhost:9092");
ProducerConfig config = new ProducerConfig(props);
Producer producer = new Producer(config);
try
{
Class.forName("com.mysql.jdbc.Driver");
Connection con = DriverManager.getConnection(
"jdbc:mysql://172.18.67.8:3306/big_data","root","root");
Statement stmt = con.createStatement();
ResultSet rs = stmt.executeQuery("select * from content_log");
while(rs.next())
{
producer.send(new KeyedMessage("lamtest",rs.getString(1) + " " + rs.getString(2)+" "+rs.getString(3)+" "+rs.getString(4)+" "+rs.getString(5)+ " "+ rs.getString(6)
+" "+ rs.getString(7)
+" "+ rs.getString(8)
+" "+ rs.getString(9)
+" "+ rs.getString(10)
+" "+ rs.getString(11)
+" "+ rs.getString(12)
+" "+ rs.getString(13)
+" "+ rs.getString(14)
+" "+ rs.getString(15)
+" "+ rs.getString(17)
+" "+ rs.getString(18)
+" "+ rs.getString(19)
+" "+ rs.getString(21)
+" "+ rs.getString(22)
));
}
con.close();
}
catch(Exception e)
{
System.out.println(e);
}
}
}
[Kafka consumer]
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
public class SimpleHLConsumer {
private final ConsumerConnector consumer;
private final String topic;
public SimpleHLConsumer(String zookeeper, String groupId, String topic) {
Properties props = new Properties();
props.put("zookeeper.connect", zookeeper);
props.put("group.id", groupId);
props.put("zookeeper.session.timeout.ms", "500");
props.put("zookeeper.sync.time.ms", "250");
props.put("auto.commit.interval.ms", "1000");
consumer = Consumer.createJavaConsumerConnector(new ConsumerConfig(props));
this.topic = topic;
}
public void testConsumer() {
Map<String, Integer> topicCount = new HashMap<>();
topicCount.put(topic, 1);
Map<String, List<KafkaStream<byte[], byte[]>>> consumerStreams = consumer.createMessageStreams(topicCount);
List<KafkaStream<byte[], byte[]>> streams = consumerStreams.get(topic);
for (final KafkaStream stream : streams) {
ConsumerIterator<byte[], byte[]> it = stream.iterator();
while (it.hasNext()) {
System.out.println("Message from Single Topic: " + new String(it.next().message()));
}
}
if (consumer != null) {
consumer.shutdown();
}
}
public static void main(String[] args) {
String topic = "lamtest";
SimpleHLConsumer simpleHLConsumer = new SimpleHLConsumer("localhost:2181", "testgroup", topic);
simpleHLConsumer.testConsumer();
}
}
Now, I want to integrate Kafka vs Storm. I want to read data from Kafka consumer to Storm spout. Please help me. Thanks.
There is storm-kafka library that gives storm spout for kafka.
http://storm.apache.org/releases/1.0.1/storm-kafka.html
https://mvnrepository.com/artifact/org.apache.storm/storm-kafka/1.0.1