Search code examples
javaapache-kafkaapache-nifiavrohortonworks-dataflow

Hortonworks Schema Registry + Nifi + Java: Deserialize Nifi Record


I am trying to deserialize some Kafka messages that were serialized by Nifi, using Hortonworks Schema Registry

  • Processor used on the Nifi Side as RecordWritter: AvroRecordSetWriter
  • Schema write strategy: HWX COntent-Encoded Schema Reference

I am able to deserialize these messsages in other Nifi kafka consumer. However I am trying to deserialize them from my Flink application using Kafka code.

I have the following inside the Kafka deserializer Handler of my Flink Application:

final String SCHEMA_REGISTRY_CACHE_SIZE_KEY = SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_SIZE.name();
final String SCHEMA_REGISTRY_CACHE_EXPIRY_INTERVAL_SECS_KEY = SchemaRegistryClient.Configuration.CLASSLOADER_CACHE_EXPIRY_INTERVAL_SECS.name();
final String SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_SIZE_KEY = SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_SIZE.name();
final String SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS_KEY = SchemaRegistryClient.Configuration.SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS.name();
final String SCHEMA_REGISTRY_URL_KEY = SchemaRegistryClient.Configuration.SCHEMA_REGISTRY_URL.name();

Properties schemaRegistryProperties = new Properties();
schemaRegistryProperties.put(SCHEMA_REGISTRY_CACHE_SIZE_KEY, 10L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_CACHE_EXPIRY_INTERVAL_SECS_KEY, 5000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_SIZE_KEY, 1000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_SCHEMA_VERSION_CACHE_EXPIRY_INTERVAL_SECS_KEY, 60 * 60 * 1000L);
schemaRegistryProperties.put(SCHEMA_REGISTRY_URL_KEY, "http://schema_registry_server:7788/api/v1");
return (Map<String, Object>) HWXSchemaRegistry.getInstance(schemaRegistryProperties).deserialize(message);

And here is the HWXSchemaRegistryCode to deserialize the message:

import com.hortonworks.registries.schemaregistry.avro.AvroSchemaProvider;
import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
import com.hortonworks.registries.schemaregistry.errors.SchemaNotFoundException;
import com.hortonworks.registries.schemaregistry.serdes.avro.AvroSnapshotDeserializer;

public class HWXSchemaRegistry {

    private SchemaRegistryClient client;
    private Map<String,Object> config;
    private AvroSnapshotDeserializer deserializer;
    private static HWXSchemaRegistry hwxSRInstance = null;

    public static HWXSchemaRegistry getInstance(Properties schemaRegistryConfig) {
        if(hwxSRInstance == null)
            hwxSRInstance = new HWXSchemaRegistry(schemaRegistryConfig);
        return hwxSRInstance;
    }

    public Object deserialize(byte[] message) throws IOException {

        Object o = hwxSRInstance.deserializer.deserialize(new ByteArrayInputStream(message), null);
        return o;
   }

    private static Map<String,Object> properties2Map(Properties config) {
        Enumeration<Object> keys = config.keys();
        Map<String, Object> configMap = new HashMap<String,Object>();
        while (keys.hasMoreElements()) {
            Object key = (Object) keys.nextElement();
            configMap.put(key.toString(), config.get(key));
        }
        return configMap;
     }

    private HWXSchemaRegistry(Properties schemaRegistryConfig) {
        _log.debug("Init SchemaRegistry Client");
        this.config = HWXSchemaRegistry.properties2Map(schemaRegistryConfig);
        this.client = new SchemaRegistryClient(this.config);

        this.deserializer = this.client.getDefaultDeserializer(AvroSchemaProvider.TYPE);
        this.deserializer.init(this.config);
     }
}

But I am getting a 404 HTTP Error code(schema not found). I think this is due to incompatible "protocols" between Nifi configuration and HWX Schema Registry Client implementation, so schema identifier bytes that the client is looking for does not exist on the server, or something like this.

Can someone help on this?

Thank you.

Caused by: javax.ws.rs.NotFoundException: HTTP 404 Not Found at org.glassfish.jersey.client.JerseyInvocation.convertToException(JerseyInvocation.java:1069) at org.glassfish.jersey.client.JerseyInvocation.translate(JerseyInvocation.java:866) at org.glassfish.jersey.client.JerseyInvocation.lambda$invoke$1(JerseyInvocation.java:750) at org.glassfish.jersey.internal.Errors.process(Errors.java:292) at org.glassfish.jersey.internal.Errors.process(Errors.java:274) at org.glassfish.jersey.internal.Errors.process(Errors.java:205) at org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:390) at org.glassfish.jersey.client.JerseyInvocation.invoke(JerseyInvocation.java:748) at org.glassfish.jersey.client.JerseyInvocation$Builder.method(JerseyInvocation.java:404) at org.glassfish.jersey.client.JerseyInvocation$Builder.get(JerseyInvocation.java:300) at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient$14.run(SchemaRegistryClient.java:1054) at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient$14.run(SchemaRegistryClient.java:1051) at java.security.AccessController.doPrivileged(Native Method) at javax.security.auth.Subject.doAs(Subject.java:360) at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.getEntities(SchemaRegistryClient.java:1051) at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.getAllVersions(SchemaRegistryClient.java:872) at com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient.getAllVersions(SchemaRegistryClient.java:676) at HWXSchemaRegistry.(HWXSchemaRegistry.java:56) at HWXSchemaRegistry.getInstance(HWXSchemaRegistry.java:26) at SchemaService.deserialize(SchemaService.java:70) at SchemaService.deserialize(SchemaService.java:26) at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.deserialize(KafkaDeserializationSchemaWrapper.java:45) at org.apache.flink.streaming.connectors.kafka.internal.KafkaFetcher.runFetchLoop(KafkaFetcher.java:140) at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.run(FlinkKafkaConsumerBase.java:712) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:93) at org.apache.flink.streaming.api.operators.StreamSource.run(StreamSource.java:57) at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.run(SourceStreamTask.java:97) at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:302) at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711) at java.lang.Thread.run(Thread.java:745)


Solution

  • I found a workaround. Since I wasn't able to get this working. I take the first bytes of the byte array to make several calls to schema registry and get the avro schema to deserialize later the rest of the byte array.

    • First byte (0) is protocol version (I figured out this is a Nifi-specific byte, since I didn't need it).
    • Next 8 bytes are the schema Id
    • Next 4 bytes are the schema version
    • The rest of the bytes are the message itself:

      import com.hortonworks.registries.schemaregistry.SchemaMetadataInfo;
      import com.hortonworks.registries.schemaregistry.SchemaVersionInfo;
      import com.hortonworks.registries.schemaregistry.SchemaVersionKey;
      import com.hortonworks.registries.schemaregistry.client.SchemaRegistryClient;
      
      try(SchemaRegistryClient client = new SchemaRegistryClient(this.schemaRegistryConfig)) {
          try {
              Long schemaId = ByteBuffer.wrap(Arrays.copyOfRange(message, 1, 9)).getLong();
              Integer schemaVersion =  ByteBuffer.wrap(Arrays.copyOfRange(message, 9, 13)).getInt();
      
      
              SchemaMetadataInfo schemaInfo = client.getSchemaMetadataInfo(schemaId);
              String schemaName = schemaInfo.getSchemaMetadata().getName();
      
              SchemaVersionInfo schemaVersionInfo = client.getSchemaVersionInfo(
                      new SchemaVersionKey(schemaName, schemaVersion));   
      
      
              String avroSchema = schemaVersionInfo.getSchemaText();
              byte[] message= Arrays.copyOfRange(message, 13, message.length);
              // Deserialize [...]
          } 
          catch (Exception e) 
          {
              throw new IOException(e.getMessage());
          }
      }
      

    I also thought that maybe I had to remove the first byte before calling the hwxSRInstance.deserializer.deserialize in my question code, since this byte seems to be a Nifi specific byte to communicate between Nifi processors, but it didn't work.

    Next step is to build a cache with the schema texts to avoid calling multiple times the schema registry API.

    New info: I will extend my answer to include the avro deserialization part, since it was some troubleshooting for me and I had to inspect Nifi Avro Reader source code to figure out this part (I was getting not valid Avro data exception when trying to use the basic avro deserialization code):

    import org.apache.avro.Schema;
    import org.apache.avro.file.SeekableByteArrayInput;
    import org.apache.avro.generic.GenericDatumReader;
    import org.apache.avro.generic.GenericRecord;
    import org.apache.avro.io.BinaryDecoder;
    import org.apache.avro.io.DatumReader;
    import org.apache.avro.io.DecoderFactory;
    
    private static GenericRecord deserializeMessage(byte[] message, String schemaText) throws IOException {
    
        InputStream in = new SeekableByteArrayInput(message);
        Schema schema = new Schema.Parser().parse(schemaText);
        DatumReader<GenericRecord> datumReader = new GenericDatumReader<>(schema);
        BinaryDecoder decoder = DecoderFactory.get().binaryDecoder(in,  null);
        GenericRecord genericRecord = null;
        genericRecord = datumReader.read(genericRecord, decoder);
        in.close();
    
        return genericRecord;
    }
    

    If you want to convert GenericRecord to map, note that String values are not Strings objects, you need to cast the Keys and values of types string:

    private static Map<String, Object> avroGenericRecordToMap(GenericRecord record)
    {
        Map<String, Object> map = new HashMap<>();
        record.getSchema().getFields().forEach(field -> 
            map.put(String.valueOf(field.name()), record.get(field.name())));
    
        // Strings are maped to Utf8 class, so they need to be casted (all the keys of records and those values which are typed as string)
        if(map.get("value").getClass() ==  org.apache.avro.util.Utf8.class)
            map.put("value", String.valueOf(map.get("value")));
    
        return map;
    }