I have created a Schema Registry using the AWS CLI, but cannot able to access it using the SpringBoot. It is showing me an error that is "Failed to get schemaVersionId by schema definition for schema name = hydra-testschema"
Creating Schema using the CLI
Then I am using SpringBoot Application to test the schema. Here is the schema structure and sample code.
Customer.avsc
{
"type": "record",
"namespace": "ABC_Organization",
"name": "Employee",
"fields": [
{
"name": "Name",
"type": "string"
},
{
"name": "Age",
"type": "int"
},
{
"name": "address",
"type": {
"type": "record",
"name": "addressRecord",
"fields": [
{
"name": "street",
"type": "string"
},
{
"name": "zipcode",
"type": "int"
}
]
}
}
]
}
SampleProducer.java
public class SampleMskProducer{
private static final Properties properties = new Properties();
private final static Logger LOGGER = LoggerFactory.getLogger(org.apache.kafka.clients.producer.Producer.class.getName());
public static void main(String[] args) throws Exception {
String username = "BrokerUserName";
String password = "Password";
String jaasTemplate = "org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" password=\"%s\";";
String jaasCfg = String.format(jaasTemplate, username, password);
System.setProperty("software.amazon.awssdk.http.service.impl", "software.amazon.awssdk.http.urlconnection.UrlConnectionSdkHttpService");
// Setting kafka properties
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "Broker");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, GlueSchemaRegistryKafkaSerializer.class.getName());
properties.put(AWSSchemaRegistryConstants.AWS_REGION, "usa-east-1");
properties.put(AWSSchemaRegistryConstants.REGISTRY_NAME, "sandbox");
properties.put(AWSSchemaRegistryConstants.SCHEMA_NAME, "hydra-testschemaa");
properties.put(AWSSchemaRegistryConstants.DATA_FORMAT, "AVRO");
properties.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
properties.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-512");
properties.put(SaslConfigs.SASL_JAAS_CONFIG, jaasCfg);
SystemPropertiesCredentialsProvider systemPropertiesCredentialsProvider=new SystemPropertiesCredentialsProvider();
//Passing the secrets
System.setProperty("aws.accessKeyId", "A");
System.setProperty("aws.secretAccessKey", "wC");
// Your AWS SDK or AWS-related code here
// Declearing and parsing the Schema fields for generic record builder.
Schema schema_customer = null;
try {
schema_customer = new Parser().parse(new File("Customer.avsc"));
} catch (IOException e) {
e.printStackTrace();
}
GenericRecord customer = new GenericData.Record(schema_customer);
GenericRecord addressRecord = new GenericData.Record(schema_customer.getField("address").schema());
LOGGER.info("Generic records phase completed...");
Random rand = new Random();
int zipcodeValue = 9999;
int ageMaxValue = 100;
// Initializing the Producer client, build records and publishing the message to the kafka broker
try (KafkaProducer<String, GenericRecord> producer = new KafkaProducer<>(properties)) {
final ProducerRecord<String, GenericRecord> record = new ProducerRecord<String, GenericRecord>("hydra.proxy.updates", customer);
LOGGER.info("Starting to send records...");
for (int i = 0; i < 10000; i++) {
addressRecord.put("street", "city-" + i);
addressRecord.put("zipcode", rand.nextInt(zipcodeValue));
customer.put("Name","name-"+ i);
customer.put("Age",rand.nextInt(ageMaxValue));
customer.put("address", addressRecord);
producer.send(record, new ProducerCallback());
}
producer.flush();
} catch (Exception e) {
e.printStackTrace();
}
}
// Defining method for retrieving secret from Secretmanager
// Callback class for producer client for logging.
private static class ProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetaData, Exception e) {
if (e == null) {
LOGGER.info("Received new metadata. \t" +
"Topic:" + recordMetaData.topic() + "\t" +
"Partition: " + recordMetaData.partition() + "\t" +
"Offset: " + recordMetaData.offset() + "\t" +
"Timestamp: " + recordMetaData.timestamp());
} else {
LOGGER.info("There's been an error from the Producer side");
e.printStackTrace();
}
}
}
}
I am getting the following error after executing:
Can anyone please check ?
Your AWS_REGION
value in your configuration should be us-east-1
instead of usa-east-1
- that's why the UnknownHostException
occurs in your screenshot. AWS does not have a region usa-east-1
, hence the host you want to reach is unknown.