I am trying to use the MSK connect Mysql CDC connector with both key and value as AVRO schemas using Glue Schema registry (GSR).
When I was doing this using confluent schema registry, the schema name for both key and value will be something like serverName.schemaName.tableName_key and serverName.schemaName.tableName_value.
But when I use the GSR both key and value schemas are coming out as serverName.schemaName.tableName and hence try to overwrite each schema and it fails.
So workaround I figured out was to use two different registries for key and value and it works. I was also able to read the topics using JDBC sink connector.
However I was not able to figure out how to write a simple java producer/consumer to write/read the topic that key and value both are AVRO and use two different registries for key and value schema.
I looked at the code in GSR github, but there are only one registry name and one schema name in the AWSSchemaRegistryConstants class, so not sure how to pass two different registries and schemas. I appreciate any examples. https://github.com/awslabs/aws-glue-schema-registry/blob/master/common/src/main/java/com/amazonaws/services/schemaregistry/utils/AWSSchemaRegistryConstants.java
/**
* Registry Name.
*/
public static final String REGISTRY_NAME = "registry.name";
/**
* Schema name.
*/
public static final String SCHEMA_NAME = "schemaName";
@OneCricketeer, this works great.. Thanks again for helping me out here.
For reference https://github.com/awslabs/aws-glue-schema-registry/issues/234, this is what I did.
In the common project create a custom class
import com.amazonaws.services.schemaregistry.common.AWSSchemaNamingStrategy
class MySchemaNamingStrategy extends AWSSchemaNamingStrategy {
@Override
public String getSchemaName(String transportName, String, data, boolean isKey) {
return transportName + (isKey ? "-key" : "-value");
}
@Override
public String getSchemaName(String transportName) {
return transportName + "-value";
}
}
On the MSK Connect I set the following parameters
key.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
key.converter.schemaNameGenerationClass=com.amazonaws.services.schemaregistry.common.MySchemaNamingStrategy
value.converter.schemaNameGenerationClass=com.amazonaws.services.schemaregistry.common.MySchemaNamingStrategy
key.converter.registryName=MyRegistry
value.converter.registryName=MyRegistry
key.converter.region=us-east-1
value.converter.region=us-east-1
key.converter.schemaAutoRegistrationEnabled=true
value.converter.schemaAutoRegistrationEnabled=true
key.converter.avroRecordType=GENERIC_RECORD
value.converter.avroRecordType=GENERIC_RECORD
This will create topics for each table with appropriate schemas
then the java consumer will look similar to
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaDeserializer.class.getName());
props.put(AWSSchemaRegistryConstants.AWS_REGION, regionName);
props.put(AWSSchemaRegistryConstants.AVRO_RECORD_TYPE, AvroRecordType.GENERIC_RECORD.getName());
props.put(AWSSchemaRegistryConstants.REGISTRY_NAME, registryName);
props.put(AWSSchemaRegistryConstants.SCHEMA_NAMING_GENERATION_CLASS , "com.amazonaws.services.schemaregistry.common.ConfluentSchemaNamingStrategy");
try {
final KafkaConsumer<GenericRecord, GenericRecord> consumer = new KafkaConsumer<GenericRecord, GenericRecord>(props);
consumer.subscribe(Collections.singletonList(topicName));
while (true) {
final ConsumerRecords<GenericRecord, GenericRecord> records = consumer.poll(Duration.ofMillis(1000) );
System.out.println("Received messages : count = " + records.count());
for (final ConsumerRecord<GenericRecord, GenericRecord> record : records) {
final String key = (record.key() == null ? "NULL_KEY" : record.key().toString());
final String value = (record.value() == null ? "NULL_VALUE" : record.value().toString());
System.out.println("Received message: key = " + key );
System.out.println("Received message: value = " + value);
}
}
} catch (Exception e) {
e.printStackTrace();
}