Search code examples
javaapache-kafkaprotocol-buffersksqldb

How do I use protobuf deserialization in rows from a KSQLDB query?


I'm trying to standardize deserialization in my KSQLDB app in Java but I'm struggling to understand how to handle the Row type returned by the KSQLDB Client type. Ex (try/catches removed):

    import io.confluent.ksql.api.client.Client;
    import io.confluent.ksql.api.client.BatchedQueryResult;

    Client ksqldbClient = kafkaService.getKSQLDBClient();
    String queryString = String.format("SELECT * FROM %s WHERE %s = '%s';", tableName, primaryKeyName, id);
    BatchedQueryResult query = ksqldbClient.executeQuery(queryString);
    List<Row> rows = query.get();

My KSQLDB tables are configured to use protobuf serialization, but it seems like the Row type is a JSON? I can only manage to get its data via:

    for (Row row : rows) {
        String json = row.asObject().toJsonString();
        // Deserialize json string
        ...
    }

Does the KSQLDB client just handle the protobuf deserialization on its own? Is there a way to get just the protobuf bytes so I can pass it into my Protobuf deserializer I have already defined so I don't need to also write a JSON deserializer?


Solution

  • row.asObject() returns a KsqlObject which is already deserialized and operates similarly to a JDBC ResultSet in that you may call various get methods on it for the types within the row.

    If you wanted to map to a specific domain object that you would have generated from Protobuf, there doesn't seem to be a direct way, and you'd likely be better off using Kafka Streams directly rather than KSQL if you needed that feature