Search code examples
apache-kafkaapache-flinkpyflink

pyflink 1.16.1 - access issue to secured Kafka cluster


I'm trying to produce to secured Kafka cluster using PyFlink.

I tried to use the default JSON producer example provided by Flink project.

My configurations look like :

USERNAME = 'username'
PASSWORD = 'password'

def write_to_kafka(env):
    type_info = Types.ROW([Types.INT(), Types.STRING()])
    ds = env.from_collection(
        [(1, 'hi'), (2, 'hello'), (3, 'hi'), (4, 'hello'), (5, 'hi'), (6, 'hello'), (6, 'hello')],
        type_info=type_info)

    serialization_schema = JsonRowSerializationSchema.Builder() \
        .with_type_info(type_info) \
        .build()
    kafka_producer = FlinkKafkaProducer(
        topic='flink-test',
        serialization_schema=serialization_schema,
        producer_config={'bootstrap.servers': 'bootstrapxxxxxxxxx:443',
                    'properties.security.protocol': 'SASL_SSL',
                    'properties.sasl.mechanism':'PLAIN',
                    'properties.ssl.truststore.location': '/home/xxxx/flink/truststore.jks',
                    'properties.ssl.truststore.password': 'password',
                    'properties.group.id': 'flink-cg',
                    'properties.sasl.jaas.config' : f'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{USERNAME}\" password=\"{PASSWORD}\";'
                     }
    )

I keep getting the following error :

        at org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:78)
        at org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:477)
        at jdk.internal.reflect.GeneratedMethodAccessor18.invoke(Unknown Source)
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.base/java.lang.reflect.Method.invoke(Method.java:566)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$handleRpcInvocation$1(AkkaRpcActor.java:309)
        at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:83)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:307)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:222)
        at org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:84)
        at org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:168)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)
        at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)
        at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
        at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
        at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
        at akka.actor.Actor.aroundReceive(Actor.scala:537)
        at akka.actor.Actor.aroundReceive$(Actor.scala:535)
        at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)
        at akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)
        at akka.actor.ActorCell.invoke(ActorCell.scala:548)
        at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)
        at akka.dispatch.Mailbox.run(Mailbox.scala:231)
        at akka.dispatch.Mailbox.exec(Mailbox.scala:243)
        ... 5 more
Caused by: org.apache.flink.kafka.shaded.org.apache.kafka.common.errors.TimeoutException: Topic flink-test not present in metadata after 60000 ms.

my understating the error doesn't necessarily mean the topic doesn't exist. It might something to do with access configurations.

I tried different configurations, and went through the documentations but still not able to correctly find the right configurations.

https://nightlies.apache.org/flink/flink-docs-release-1.15/api/python/reference/pyflink.datastream/api/pyflink.datastream.connectors.FlinkKafkaConsumer.html#pyflink.datastream.connectors.FlinkKafkaConsumer


Solution

  • The issue was that I'm using (FlinkKafkaProducer),which according to documentation is deprecated and removed in Flink 1.15. I'm using Flink 1.61.1

    "FlinkKafkaProducer is deprecated and will be removed with Flink 1.15, please use KafkaSink instead." The link is here

    After switching to the new API from pyflink.datastream.connectors.kafka import KafkaSink I was able to produce to Kafka without issues following code snippets in the documentation.