I generate an Avro schema from a POJO using Jackson and serialize it. This works quite well so far with the help of Jackson.
However, I cannot upload this schema to the schema registry using the Cached Schema registry.
Test setup is with Testcontainer see here:
SchemaRegistryClient schemaRegistryClient( final String url ) {
return new CachedSchemaRegistryClient( url, 100 );
}
public byte[] serialize( final Optional<Object> root ) {
if ( root.isEmpty() ) {
return new byte[0];
}
final Object rootObject = root.get();
final String subjectName = rootObject.getClass().getSimpleName();
try {
// Jackson Avro Schema
final AvroSchema schema = getAvroSchema( rootObject );
final io.confluent.kafka.schemaregistry.avro.AvroSchema avroSchema = new io.confluent.kafka.schemaregistry.avro.AvroSchema( schema.getAvroSchema(),
1 );
schemaRegistryClient.register( subjectName, avroSchema, false );
return avroMapper.writer( schema ).writeValueAsBytes( rootObject );
} catch ( final IOException | RuntimeException e ) {
throw new SerializationException( "Error serializing Avro message", e );
} catch ( final RestClientException e ) {
throw toKafkaException( e );
}
}
Error is:
java.net.SocketException: Unexpected end of file from server
at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:903)
at java.base/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:726)
at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:900)
at java.base/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:726)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1688)
at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1589)
at java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:529)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:561)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:549)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:290)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:397)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:370)
Anyone know experience with this?
EDIT
Client is 7.3.3 and the Container is confluentinc/cp-schema-registry:7.3.3.
Interesting is the url. Reused the idea from KafkaContainer, but maybe I need another port?
First try:
public String getClientUrl() {
return String.format( "http://%s:%s", getHost(), getMappedPort( SCHEMA_REGISTRY_PORT ) );
}
Error:
Caused by: java.net.SocketException: Unexpected end of file from server
Second try:
public String getClientUrl() {
return String.format( "http://%s:%s", getHost(), getExposedPorts().get( 0 ) );
}
Error:
Caused by: java.net.ConnectException: Connection refused
Avro Schema looks like this:
{
"type": "record",
"name": "TestDto",
"namespace": "test.model",
"fields": [
{
"name": "id",
"type": [
"null",
{
"type": "long",
"java-class": "java.lang.Long"
}
]
},
{
"name": "labels",
"type": [
"null",
{
"type": "array",
"items": "string",
"java-class": "java.util.Set"
}
]
}
]
}
I'm a little puzzled as to why all the java types are in there, but I guess that's another topic.
Next try, was to take the confluent user avro example:
final io.confluent.kafka.schemaregistry.avro.AvroSchema schema1 = new io.confluent.kafka.schemaregistry.avro.AvroSchema( new Schema.Parser().parse( "{\n"
+ " \"namespace\": \"io.confluent.developer\",\n"
+ " \"type\": \"record\",\n"
+ " \"name\": \"User\",\n"
+ " \"fields\": [\n"
+ " {\n"
+ " \"name\": \"name\",\n"
+ " \"type\": \"string\",\n"
+ " \"avro.java.string\": \"String\"\n"
+ " },\n"
+ " {\n"
+ " \"name\": \"age\",\n"
+ " \"type\": \"int\"\n"
+ " }\n"
+ " ]\n"
+ "}"));
schemaRegistryClient.register( subjectName, schema1, false );
Json Schema register is also not working right now. So looks for a general issue with the schema registry.
Tried with:
spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
Caused by: java.net.SocketException: Unexpected end of file from server
at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:903)
The env for the Schema Registry Testcontainer was wrong changed from
withEnv( "SCHEMA_REGISTRY_HOST_NAME", "localhost" );
to
withEnv( "SCHEMA_REGISTRY_HOST_NAME", "cp-schema-registry" );