Search code examples
databasetime-seriesquestdb

Kafka Connect and QuestDB with Protobuf


I have a table in QuestDB with this structure

CREATE TABLE  IF NOT EXISTS 'transactions' (
        timestamp TIMESTAMP,
  merchant SYMBOL capacity 5000 CACHE,
  category SYMBOL capacity 256 CACHE,
  amt DOUBLE,
  gender SYMBOL capacity 256 CACHE,
  city SYMBOL capacity 2000 CACHE,
  state SYMBOL capacity 256 CACHE,
  first VARCHAR,
  last VARCHAR,
  street VARCHAR,
  job VARCHAR,
  trans_num VARCHAR,
  cc_num LONG,
  zip LONG,
  city_pop LONG,
  dob LONG,
  lat DOUBLE,
  lon DOUBLE,
  merch_lat DOUBLE,
  merch_long DOUBLE
) timestamp (timestamp) PARTITION BY DAY WAL DEDUP UPSERT KEYS(timestamp, trans_num);

I can ingest data directly using ILP, but I want to put a Kafka in front of the database, so I can restart the server for upgrades or maintenance without disrupting ingestion, and I will send data in protobuf format.

The QuestDB docs and repository mention AVRO, but no mention to protobuf. Does anyone know if it is supported and how would I configure it?


Solution

  • It turns out in Kafka Connect connectors are oblivious to (de-)serialization - Kafka Connect framework transcodes messages to a common abstract format, regardless of the serde used.

    This means converter configurations are fairly re-used among connectors and we can use Protobuf in the same way we can use AVRO. By the time the QuestDB connector gets the message, it is already in a known format.

    This would be the code for registering a connector using QuestDB and Protobuf for a schema matching the one above:

    curl -s -X PUT -H  "Content-Type:application/json" http://localhost:8083/connectors/questdb-transactions/config -d '{
              "connector.class": "io.questdb.kafka.QuestDBSinkConnector",
              "tasks.max": "5",
              "topics": "transactions",
              "client.conf.string": "http::addr=${QUESTDB_HTTP_ENDPOINT};auto_flush_interval=500;retry_timeout=100000;",
              "name": "questdb-transactions",
              "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
              "value.converter.schema.registry.url": "http://${SCHEMA_REGISTRY_HTTP_ENDPOINT}",
              "include.key": false,
              "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
              "key.converter.schema.registry.url": "http://${SCHEMA_REGISTRY_HTTP_ENDPOINT}",
              "table": "transactions",
              "symbols": "merchant, category, gender, city, state",
              "timestamp.field.name": "timestamp",
              "value.converter.schemas.enable": true
          }'
    

    Of course values like the maximum tasks, the topic and conector names, and any URLs should be tailored for the specific use case, but that would be the general way of working with protobuf.

    Note we are adding parameters like the auto_flush_interval to control how often the connector will send data into QuestDB, and the retry_timeout, which will keep retrying up to 100 seconds after an error, so we can recover from a server restart without manual intervention.