Search code examples
apache-kafkaaws-glueavroaws-mskaws-msk-connect

MSK Java producer/consumer with both key and value in AVRO and using glue schema registry


I am trying to use the MSK connect Mysql CDC connector with both key and value as AVRO schemas using Glue Schema registry (GSR).

When I was doing this using confluent schema registry, the schema name for both key and value will be something like serverName.schemaName.tableName_key and serverName.schemaName.tableName_value.

But when I use the GSR both key and value schemas are coming out as serverName.schemaName.tableName and hence try to overwrite each schema and it fails.

So workaround I figured out was to use two different registries for key and value and it works. I was also able to read the topics using JDBC sink connector.

However I was not able to figure out how to write a simple java producer/consumer to write/read the topic that key and value both are AVRO and use two different registries for key and value schema.

I looked at the code in GSR github, but there are only one registry name and one schema name in the AWSSchemaRegistryConstants class, so not sure how to pass two different registries and schemas. I appreciate any examples. https://github.com/awslabs/aws-glue-schema-registry/blob/master/common/src/main/java/com/amazonaws/services/schemaregistry/utils/AWSSchemaRegistryConstants.java

 /**
 * Registry Name.
 */
 public static final String REGISTRY_NAME = "registry.name";
 /**
 * Schema name.
 */
 public static final String SCHEMA_NAME = "schemaName";

Solution

  • @OneCricketeer, this works great.. Thanks again for helping me out here.

    For reference https://github.com/awslabs/aws-glue-schema-registry/issues/234, this is what I did.

    In the common project create a custom class

    import com.amazonaws.services.schemaregistry.common.AWSSchemaNamingStrategy
        
    class MySchemaNamingStrategy extends AWSSchemaNamingStrategy {
        @Override 
        public String getSchemaName(String transportName, String, data, boolean isKey) {
           return transportName + (isKey ? "-key" : "-value");
        }
        @Override 
        public String getSchemaName(String transportName) {
            return transportName + "-value";
        }
    }
    

    On the MSK Connect I set the following parameters

    key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
    value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
    key.converter.schemaNameGenerationClass=com.amazonaws.services.schemaregistry.common.MySchemaNamingStrategy
    value.converter.schemaNameGenerationClass=com.amazonaws.services.schemaregistry.common.MySchemaNamingStrategy
    key.converter.registryName=MyRegistry
    value.converter.registryName=MyRegistry
    key.converter.region=us-east-1
    value.converter.region=us-east-1
    key.converter.schemaAutoRegistrationEnabled=true
    value.converter.schemaAutoRegistrationEnabled=true
    key.converter.avroRecordType=GENERIC_RECORD
    value.converter.avroRecordType=GENERIC_RECORD
    

    This will create topics for each table with appropriate schemas

    then the java consumer will look similar to

    Properties props = new Properties();
    props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
    props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
    props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
    props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
    
    props.put(AWSSchemaRegistryConstants.AWS_REGION, regionName);
    props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
    props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, registryName);
    props.put(AWSSchemaRegistryConstants.SCHEMA_NAMING_GENERATION_CLASS , "com.amazonaws.services.schemaregistry.common.ConfluentSchemaNamingStrategy");
    
    try {
        final KafkaConsumer<GenericRecord, GenericRecord> consumer = new KafkaConsumer<GenericRecord, GenericRecord>(props);
        consumer.subscribe(Collections.singletonList(topicName));
    
        while (true) {
            final ConsumerRecords<GenericRecord, GenericRecord> records = consumer.poll(Duration.ofMillis(1000) );
            System.out.println("Received messages : count = " + records.count());
            for (final ConsumerRecord<GenericRecord, GenericRecord> record : records) {
                final String key = (record.key() == null ? "NULL_KEY" : record.key().toString());
                final String value = (record.value() == null ? "NULL_VALUE" : record.value().toString());
                System.out.println("Received message: key = " + key );
                System.out.println("Received message: value = " + value);
            }
        }
    } catch (Exception e) {
        e.printStackTrace();
    }