Search code examples
javaapache-kafkaavroconfluent-schema-registryjackson-dataformat-avro

Error register jackson avro schema with confluent cloud schema registry client


I generate an Avro schema from a POJO using Jackson and serialize it. This works quite well so far with the help of Jackson.

However, I cannot upload this schema to the schema registry using the Cached Schema registry.

Test setup is with Testcontainer see here:

  SchemaRegistryClient schemaRegistryClient( final String url ) {
     return new CachedSchemaRegistryClient( url, 100 );
  }

 public byte[] serialize( final Optional<Object> root ) {
      if ( root.isEmpty() ) {
         return new byte[0];
      }
      final Object rootObject = root.get();
      final String subjectName = rootObject.getClass().getSimpleName();

      try {
         // Jackson Avro Schema
         final AvroSchema schema = getAvroSchema( rootObject );
         final io.confluent.kafka.schemaregistry.avro.AvroSchema avroSchema = new io.confluent.kafka.schemaregistry.avro.AvroSchema( schema.getAvroSchema(),
               1 );
         schemaRegistryClient.register( subjectName, avroSchema, false );
         return avroMapper.writer( schema ).writeValueAsBytes( rootObject );
      } catch ( final IOException | RuntimeException e ) {
         throw new SerializationException( "Error serializing Avro message", e );
      } catch ( final RestClientException e ) {
         throw toKafkaException( e );
      }

   }

Error is:

java.net.SocketException: Unexpected end of file from server
    at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:903)
    at java.base/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:726)
    at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:900)
    at java.base/sun.net.www.http.HttpClient.parseHTTP(HttpClient.java:726)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream0(HttpURLConnection.java:1688)
    at java.base/sun.net.www.protocol.http.HttpURLConnection.getInputStream(HttpURLConnection.java:1589)
    at java.base/java.net.HttpURLConnection.getResponseCode(HttpURLConnection.java:529)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:294)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:384)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:561)
    at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:549)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:290)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:397)
    at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:370)

Anyone know experience with this?

EDIT

Client is 7.3.3 and the Container is confluentinc/cp-schema-registry:7.3.3.

Interesting is the url. Reused the idea from KafkaContainer, but maybe I need another port?

First try:

   public String getClientUrl() {
      return String.format( "http://%s:%s", getHost(), getMappedPort( SCHEMA_REGISTRY_PORT ) );
   }

Error:

Caused by: java.net.SocketException: Unexpected end of file from server

Second try:

       public String getClientUrl() {
          return String.format( "http://%s:%s", getHost(), getExposedPorts().get( 0 ) );
       }

Error:

Caused by: java.net.ConnectException: Connection refused

Avro Schema looks like this:

{
  "type": "record",
  "name": "TestDto",
  "namespace": "test.model",
  "fields": [
    {
      "name": "id",
      "type": [
        "null",
        {
          "type": "long",
          "java-class": "java.lang.Long"
        }
      ]
    },
    {
      "name": "labels",
      "type": [
        "null",
        {
          "type": "array",
          "items": "string",
          "java-class": "java.util.Set"
        }
      ]
    }
  ]
}

I'm a little puzzled as to why all the java types are in there, but I guess that's another topic.

Next try, was to take the confluent user avro example:

      final io.confluent.kafka.schemaregistry.avro.AvroSchema schema1 = new io.confluent.kafka.schemaregistry.avro.AvroSchema( new Schema.Parser().parse( "{\n"
            + "  \"namespace\": \"io.confluent.developer\",\n"
            + "  \"type\": \"record\",\n"
            + "  \"name\": \"User\",\n"
            + "  \"fields\": [\n"
            + "    {\n"
            + "      \"name\": \"name\",\n"
            + "      \"type\": \"string\",\n"
            + "      \"avro.java.string\": \"String\"\n"
            + "    },\n"
            + "    {\n"
            + "      \"name\": \"age\",\n"
            + "      \"type\": \"int\"\n"
            + "    }\n"
            + "  ]\n"
            + "}"));
schemaRegistryClient.register( subjectName, schema1, false );

Json Schema register is also not working right now. So looks for a general issue with the schema registry.

Tried with:

spring.kafka.producer.value-serializer=io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer

Caused by: java.net.SocketException: Unexpected end of file from server
    at java.base/sun.net.www.http.HttpClient.parseHTTPHeader(HttpClient.java:903)

Solution

  • The env for the Schema Registry Testcontainer was wrong changed from

    withEnv( "SCHEMA_REGISTRY_HOST_NAME", "localhost" );
    

    to

    withEnv( "SCHEMA_REGISTRY_HOST_NAME", "cp-schema-registry" );