Search code examples
serializationapache-kafkadeserializationapache-flinkflink-streaming

Flink setup basic Kafka producer consumer with Custom Class in Java


I wanted to setup a basic producer-consumer with Flink on Kafka but I am having trouble producing data to an existing consumer via Java.

CLI solution

  1. I setup a Kafka broker using kafka_2.11-2.4.0 zip from https://kafka.apache.org/downloads with commands

    bin/zookeeper-server-start.sh config/zookeeper.properties

    and bin/kafka-server-start.sh config/server.properties

  2. I create a topic called transactions1 using

    bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic transactions1

    Now I can use a producer and consumer on the command line to see that the topic has been created and works.

  3. To setup consumer I run

    bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic transactions1 --from-beginning

    Now if any producer sends data to the topic transactions1 I will see it in the consumer console.

    I test that the consumer is working by running

    bin/kafka-console-producer.sh --broker-list localhost:9092 --topic transactions1

    and enter the following data lines in the producer in cli which also show up in consumer cli.

{"txnID":1,"amt":100.0,"account":"AC1"}

{"txnID":2,"amt":10.0,"account":"AC2"}

{"txnID":3,"amt":20.0,"account":"AC3"}

Now I want to replicate step 3 i.e producer and consumer in Java code which is the core problem of this question.

  1. So I setup a gradle java8 project with build.gradle
...
dependencies {
    testCompile group: 'junit', name: 'junit', version: '4.12'
    compile group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: '1.9.0'
    compile group: 'org.apache.flink', name: 'flink-core', version: '1.9.0'
    // https://mvnrepository.com/artifact/org.apache.flink/flink-streaming-java
    compile group: 'org.apache.flink', name: 'flink-streaming-java_2.12', version: '1.9.2'
    compile group: 'com.google.code.gson', name: 'gson', version: '2.8.5'
    compile group: 'com.twitter', name: 'chill-thrift', version: '0.7.6'
    compile group: 'org.apache.thrift', name: 'libthrift', version: '0.11.0'
    compile group: 'com.twitter', name: 'chill-protobuf', version: '0.7.6'
    compile group: 'org.apache.thrift', name: 'protobuf-java', version: '3.7.0'
}
...
  1. I setup a Custom Class Transactions.class where you can suggest changes to the Serialization Logic using Kryo, Protobuf or TbaseSerializer by extending classes relevant to Flink.
import com.google.gson.Gson;
import org.apache.flink.api.common.functions.MapFunction;

public class Transaction {
    public final int txnID;
    public final float amt;
    public final String account;

    public Transaction(int txnID, float amt, String account) {
        this.txnID = txnID;
        this.amt = amt;
        this.account = account;
    }


    public String toJSONString() {
        Gson gson = new Gson();
        return gson.toJson(this);
    }

    public static Transaction fromJSONString(String some) {
        Gson gson = new Gson();
        return gson.fromJson(some, Transaction.class);
    }

    public static MapFunction<String, String> mapTransactions() {
        MapFunction<String, String> map = new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                if (value != null || value.trim().length() > 0) {
                    try {
                        return fromJSONString(value).toJSONString();
                    } catch (Exception e) {
                        return "";
                    }
                }
                return "";
            }
        };
        return map;
    }

    @Override
    public String toString() {
        return "Transaction{" +
                "txnID=" + txnID +
                ", amt=" + amt +
                ", account='" + account + '\'' +
                '}';
    }
}
  1. Now time to use Flink to Produce and Consume streams on topic transactions1.
public class SetupSpike {
    public static void main(String[] args) throws Exception {
        System.out.println("begin");
        List<Transaction> txns = new ArrayList<Transaction>(){{
            add(new Transaction(1, 100, "AC1"));
            add(new Transaction(2, 10, "AC2"));
            add(new Transaction(3, 20, "AC3"));
        }};
        // This list txns needs to be serialized in Flink as Transaction.class->String->ByteArray 
        //via producer and then to the topic in Kafka broker 
        //and deserialized as ByteArray->String->Transaction.class from the Consumer in Flink reading Kafka broker.
        
        String topic = "transactions1";
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "localhost:9092");
        properties.setProperty("zookeeper.connect", "localhost:2181");
        properties.setProperty("group.id", topic);

        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        //env.getConfig().addDefaultKryoSerializer(Transaction.class, TBaseSerializer.class);

        // working Consumer logic below which needs edit if you change serialization
        FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties);
        myConsumer.setStartFromEarliest();     // start from the earliest record possible
        DataStream<String> stream = env.addSource(myConsumer).map(Transaction::toJSONString);
       
        //working Producer logic below which works if you are sinking a pre-existing DataStream
        //but needs editing to work with Java List<Transaction> datatype.
        System.out.println("sinking expanded stream");
        MapFunction<String, String> etl = new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                if (value != null || value.trim().length() > 0) {
                    try {
                        return fromJSONString(value).toJSONString();
                    } catch (Exception e) {
                        return "";
                    }
                }
                return "";
            }
        };
        FlinkKafkaProducer<String> myProducer = new FlinkKafkaProducer<String>(topic,
                new KafkaSerializationSchema<String>() {
                    @Override
                    public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
                        try {
                            System.out.println(element);
                            return new ProducerRecord<byte[], byte[]>(topic, stringToBytes(etl.map(element)));
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        return null;
                    }
                }, properties, Semantic.EXACTLY_ONCE);
//        stream.timeWindowAll(Time.minutes(1));
        stream.addSink(myProducer);
        JobExecutionResult execute = env.execute();

    }
}

As you can see I am not able to do this with the List txns provided. The above is working code I could gather from Flink documentation to redirect topic stream data and sending data manually via Cli producer. The problem is writing KafkaProducer code in java that sends data to the topic, which is further compounded with issues like

  1. Adding Timestamps, Watermarks
  2. KeyBy operations
  3. GroupBy/WindowBy operations
  4. Adding custom ETL logic before Sinking.
  5. Serialization/Deserialization logic in Flink

Can someone who has worked with Flink please help me with how to Produce the txns List to transactions1 topic in Flink and then verify that it works with Consumer? Also any help on the issues of adding timestamp or some processing before sinking will be of great help. You can find source code on https://github.com/devssh/kafkaFlinkSpike and the intent is generate Flink boilerplate to add details of "AC1" from an in-memory store and join it with the Transaction event coming in real time to send expanded output to user.


Solution

  • Several points, in no particular order:

    It would be better not to mix Flink version 1.9.2 together with version 1.9.0 as you've done here:

    compile group: 'org.apache.flink', name: 'flink-connector-kafka_2.11', version: '1.9.0'
    compile group: 'org.apache.flink', name: 'flink-core', version: '1.9.0'
    compile group: 'org.apache.flink', name: 'flink-streaming-java_2.12', version: '1.9.2'
    

    For tutorials on how to work with timestamps, watermarks, keyBy, windows, etc., see the online training materials from Ververica.

    To use List<Transaction> txns as an input stream, you can do this (docs):

    DataStream<Transaction> transactions = env.fromCollection(txns);
    

    For an example of how to handle serialization / deserialization when working with Flink and Kafka, see the Flink Operations Playground, in particular look at ClickEventDeserializationSchema and ClickEventStatisticsSerializationSchema, which are used in ClickEventCount.java and defined here. (Note: this playground has not yet been updated for Flink 1.10.)