Search code examples
regexapache-kafkaapache-kafka-connect

filter topic list in BigQuery sink connector


I'm using kafka connect to write data from kafka to BigQuery. I have multiple topics consumed by a single sink connector, that is using regex to filter the topics it consumes, and catch new topics being added (with the same prefix) automatically (~340 topics, ~30 consumers).
I want to edit a sink connector to ignore a specific topic that is catched.
The regex field in the connector configuration is for example:

"topics.regex": "sourcename_sourcetype_dataset_(.*)"

What i'm looking for is an "except" mechanism. Preferably without altering the regex itself, but that's an acceptable solution (if so, how can I edit it to not include a table named for example products_history.


Solution

  • In my experience, regex doesn't work well for "all but one thing".

    You can try using the Filter transform + negate rule + TopicNameMatches predicate, making all other topics (via topics or topics.regex) flow through unaltered.

    "transforms": "HistoryFilter",
    "transforms.HistoryFilter.type": "org.apache.kafka.connect.transforms.Filter",
    "transforms.HistoryFilter.predicate": "IsProductHistory",
    "transforms.HistoryFilter.negate": "true",
    
    "predicates": "IsProductHistory",
    "predicates.IsProductHistory.type": "org.apache.kafka.connect.transforms.predicates.TopicNameMatches",
    "predicates.IsProductHistory.pattern": "sourcename_sourcetype_dataset_products_history"
    

    https://docs.confluent.io/platform/current/connect/transforms/filter-ak.html