Search code examples
apache-flinkflink-streamingflink-cep

Apache Flink how to sink from Java ObjectNode -> JSON string?


So this takes in JSON strings -> Java ObjectNode.

    final DataStream<ObjectNode> inputStream = env
        .addSource(new RMQSource<ObjectNode>(
            connectionConfig,                   // config for the RabbitMQ connection
            "start",                            // name of the RabbitMQ queue to consume
            true,                               // use correlation ids; can be false if only at-least-once is required
            new JSONDeserializationSchema()))   // deserialization schema to turn messages into Java objects
        .setParallelism(1);                     // non-parallel source is only required for exactly-once

How do I put them back from Java ObjectNode -> JSON string?

stream.addSink(new RMQSink<ObjectNode>(
            connectionConfig,
            "stop",
            new JSONSerializationSchema()
        ));

JSONSerializationSchema doesn't exist but I'd need something like that.


Solution

  • Use a custom SerializationSchema like this:

    stream.addSink(new RMQSink<ObjectNode>(
                connectionConfig,
                "stop",
                new SerializationSchema<ObjectNode>() {
                        @Override
                        public byte[] serialize( ObjectNode element ) {
                            return element.toString().getBytes();
                        }
                }
            ));