Search code examples
regexapache-flink

Using regular expression to consume multiple topic in Flink


I know that flink is able to consume multiple topics using regular expression as enter link description here.

I have the following topic name such as

sclee-10343434
sclee-10342432
sclee-34234
sclee-3343423432424
....

In this case, when I set the value as below using the regular expression as sclee-[\\d+], then it gave me the exception.

Is it correct for the case of the regular expression in my case? And also, did the Flink really support it?

val source = new FlinkKafkaConsumer[T](
  java.util.regex.Pattern.compile("sclee-[\\d+]"),
  deserializer,
  consumerProps
)

The error is below.

Caused by: java.lang.RuntimeException: Unable to retrieve any partitions with KafkaTopicsDescriptor: Topic Regex Pattern (dev-plexer-10507689[\d+])
    at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.discoverPartitions(AbstractPartitionDiscoverer.java:153)
    at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:553)
    at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
    at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
    at org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$0(StreamTask.java:473)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$SynchronizedStreamTaskActionExecutor.runThrowing(StreamTaskActionExecutor.java:92)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:469)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:522)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
    at java.lang.Thread.run(Thread.java:750)

Solution

  • I think the problem here is that the regex will not match the topics provided. The provided regex sclee-[\\d+] matches sclee- followed by a single digit or + sign.

    In this case You most probably need: sclee-[\\d]+.