Search code examples
javaapache-flinkavroconfluent-platformconfluent-schema-registry

Confluent JDBC connector And Flink consumer


We are trying to use SQL-Server JDBC Connector with KafkaAvroSerializer and also providing Customized ProducerInterceptor to Encrypt data before sending it to Kafka.

On consumer side, we want to use Flink connector to Decrypt and then use the appropriate deserializer.

We got couple of question in order to achieve that:

1) If we provide Customized ConsumerInterceptor to decrypt data, then should that be passed in through Properties file when we create DataStream in Flink ?

Properties properties = new Properties();
        ...
    `properties.setProperty("consumer.interceptor.classes": "OurCusromDecryptConsumerInterceptor")`;
    ...

    DataStream<String> stream = env.addSource(new FlinkKafkaConsumer011<>("sqlserver-foobar", ???, properties));

Is above configuration correct or do I need to set any other property so that I can pass in ConsumerInterceptor to Flink ?

2) Another question is about Deserializer in Flink. I looked it up for example on the web and found few code snippets like following:

public class ConfluentAvroDeserializationSchema implements DeserializationSchema<Type ??> { 

    private final String schemaRegistryUrl; 
    private final int identityMapCapacity; 
    private KafkaAvroDecoder kafkaAvroDecoder; 

    public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) { 
        this(schemaRegistyUrl, 1000); 
    }

So if we are passing data using JDBC Connector to Kafka without any modification (apart from encrypting data) then what's the Data Type should we provide during Deserialization ? We'll be decrypting data before deserialization.

public class ConfluentAvroDeserializationSchema implements DeserializationSchema<Type ??> { 

Thanks in advance,


Solution

  • Just adding the end result so that it can help anybody who is looking for the same:

    public class ConfluentAvroDeserializationSchema
                implements DeserializationSchema<GenericRecord> {
    
            private final String schemaRegistryUrl;
            private final int identityMapCapacity;
            private transient KafkaAvroDecoder kafkaAvroDecoder;
    
    
            public ConfluentAvroDeserializationSchema(String schemaRegistyUrl) {
                this(schemaRegistyUrl, 1000);
            }
    
            public ConfluentAvroDeserializationSchema(String schemaRegistryUrl, int
                    identityMapCapacity) {
                this.schemaRegistryUrl = schemaRegistryUrl;
                this.identityMapCapacity = identityMapCapacity;
            }
    
            @Override
            public GenericRecord deserialize(byte[] bytes) throws IOException {
                if (kafkaAvroDecoder == null) {
                    SchemaRegistryClient schemaRegistry = new
                            CachedSchemaRegistryClient(this.schemaRegistryUrl,
                            this.identityMapCapacity);
                    this.kafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry);
                }
                return (GenericRecord) this.kafkaAvroDecoder.fromBytes(bytes);
            }
    
            @Override
            public boolean isEndOfStream(GenericRecord string) {
                return false;
            }
    
            @Override
            public TypeInformation<GenericRecord> getProducedType() {
                return TypeExtractor.getForClass(GenericRecord.class);
            }
        }