Search code examples
phpapache-kafka

PHP rdkafka. How to consume topic matching regular expression?


I can consume messages from Kafka, only if I specify topics with their strict name, like 'some_topic', but I wnat to consume topics which match a particular regular expression:

<?php

$conf = new RdKafka\Conf();
//$conf->set('log_level', 1);
//$conf->set('debug', 'all');
$rk = new RdKafka\Consumer($conf);
$rk->addBrokers("localhost:9092");

$topic = $rk->newTopic("^postgres.public.table_[0-9]+_");

$topic->consumeStart(0, RD_KAFKA_OFFSET_BEGINNING);

while (true) {
    $msg = $topic->consume(0, 1000);
    if (null === $msg || $msg->err === RD_KAFKA_RESP_ERR__PARTITION_EOF) {
        continue;
    } elseif ($msg->err) {
        echo $msg->errstr(), "\n";
        break;
    } else {
       echo $msg->payload;
        echo "\n\n";
    }
}

As you can see the topic name is ^postgres.public.table_[0-9]+_. However, it does not work. I get these error messages in the console:

%3|1579347443.452|ERROR|rdkafka#consumer-1| [thrd:app]: rdkafka#consumer-1: ^postgres.public.object_[0-9]+_ [0]: topic does not exist (Broker: Invalid topic)

What is wrong with that and how can I fix it?


Solution

  • newTopic() is used to create a new instance of an RdKafka\Topic object. It is not used to join a topic.

    Use the subscribe() method to join a topic based on a pattern, which would be a regex in this case. So you use something like:

    $rk->subscribe('^postgres.public.table_[0-9]+_');