Search code examples
serializationapache-flinkconnectorsink

Flink user defined sink connector can not serialize data into JSON format


I am working on a user-defined Flink MQTT connector.

https://github.com/yinjilong/StoneForests-flink-mqtt-connector

However, I encounter a serialization problem when I try to write messages in json format.

In the class public class MqttSinkFunction<T> extends RichSinkFunction<T> I override the invoke method as follows,


    @Override
    public void invoke(T event, Context context) {
        if (log.isDebugEnabled()) {
            log.debug("sink invoke...");
            log.debug("message is {}", event);
        }

        try {
            byte[] payload = this.serializer.serialize(event);
        } catch (Exception e){
            log.error("can not serialize event{} at {}",event,e);
        }

        try{
            byte[]  payload = this.serializer.serialize(event);
            MqttMessage message = new MqttMessage(payload);
            message.setQos(this.qos);
            String[] topics = this.topics.split(",");
            for (String topic : topics) {
                if (log.isDebugEnabled()) {
                    log.debug("send message:[{}] to topic topic:[{}].  Exception.", message, topic);
                }
                    this.client.publish(topic, message);
            }
        } catch(Exception e){
                log.error("Cannot sink MQTT event {} at {}",event ,hostUrl,e);
        }

    }


I uses the following SQL to write a message,

$ sql-client.sh

CREATE TABLE sink(
     id INT,
     name STRING
) WITH(
  'connector' = 'mqtt',
  'hostUrl' = 'tcp://localhost:1883',
  'username' = '',
  'password' = '',
  'sinkTopics' = 'test/mytopic',
  'format' = 'json'
 );

INSERT INTO sink (id,name) VALUES(1,'Jeen');

It seems that the serialization from event(RowData) is failing. The event is shown as +I(1,Jeen)

I thought the internal serialization could infer the JSON schema from given SQL table schema automatically when I use the JSON format. but it fails and throws an exception.

What can I try next?


Solution

  • NPE causes the failure of serialization from event(RowData), because org.apache.flink.formats.json.JsonRowDataSerializationSchema#mapper is null in your com.nakata.flink.connectors.mqtt.table.MqttSinkFunction#serializer instance.
    you can fix it by adding a line to com.nakata.flink.connectors.mqtt.table.MqttSinkFunction#open function.

    this.serializer.open(null);