I'm trying to standardize deserialization in my KSQLDB app in Java but I'm struggling to understand how to handle the Row
type returned by the KSQLDB Client
type. Ex (try/catches removed):
import io.confluent.ksql.api.client.Client;
import io.confluent.ksql.api.client.BatchedQueryResult;
Client ksqldbClient = kafkaService.getKSQLDBClient();
String queryString = String.format("SELECT * FROM %s WHERE %s = '%s';", tableName, primaryKeyName, id);
BatchedQueryResult query = ksqldbClient.executeQuery(queryString);
List<Row> rows = query.get();
My KSQLDB tables are configured to use protobuf serialization, but it seems like the Row
type is a JSON? I can only manage to get its data via:
for (Row row : rows) {
String json = row.asObject().toJsonString();
// Deserialize json string
...
}
Does the KSQLDB client just handle the protobuf deserialization on its own? Is there a way to get just the protobuf bytes so I can pass it into my Protobuf deserializer I have already defined so I don't need to also write a JSON deserializer?
row.asObject()
returns a KsqlObject which is already deserialized and operates similarly to a JDBC ResultSet in that you may call various get methods on it for the types within the row.
If you wanted to map to a specific domain object that you would have generated from Protobuf, there doesn't seem to be a direct way, and you'd likely be better off using Kafka Streams directly rather than KSQL if you needed that feature