Search code examples
apache-kafkaapache-flinkflink-streamingflink-sqlpyflink

Pyflink 1.14 table connectors - Kafka authentication


I've only seen Pyflink table API examples of kafka connections which does not contain authentication details in the connection establishment (doc ref), namely source table connection:

source_ddl = """
        CREATE TABLE source_table(
            a VARCHAR,
            b INT
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'source_topic',
          'properties.bootstrap.servers' = 'kafka:9092',
          'properties.group.id' = 'test_3',
          'scan.startup.mode' = 'latest-offset',
          'format' = 'json'
        )
        """

I however need to connect to kafka sources with authentication enabled. By 'interpreting' that all property.XXX are dedicated as kafka config, I altered the examples as follows and tested:

import os
from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment
from pyflink.table import TableEnvironment, EnvironmentSettings, environment_settings
from pyflink.table.table_environment import StreamTableEnvironment

KAFKA_SERVERS = 'localhost:9092'
KAFKA_USERNAME = "user"
KAFKA_PASSWORD = "XXX"
KAFKA_SOURCE_TOPIC = 'source'
KAFKA_SINK_TOPIC = 'dest'


def log_processing():

    env = StreamExecutionEnvironment.get_execution_environment()
    env.add_jars("file:///opt/flink/lib_py/kafka-clients-2.4.1.jar")
    env.add_jars("file:///opt/flink/lib_py/flink-connector-kafka_2.11-1.14.0.jar")
    env.add_jars("file:///opt/flink/lib_py/flink-sql-connector-kafka_2.12-1.14.0.jar")
    settings = EnvironmentSettings.new_instance()\
                      .in_streaming_mode()\
                      .use_blink_planner()\
                      .build()

    t_env = StreamTableEnvironment.create(stream_execution_environment= env, environment_settings=settings)
    
    source_ddl = f"""
            CREATE TABLE source_table(
                Cylinders INT,
                Displacement INT,
                Horsepower INT,
                Weight INT,
                Acceleration INT,
                Model_Year INT,
                USA INT,
                Europe INT,
                Japan INT
            ) WITH (
              'connector' = 'kafka',
              'topic' = '{KAFKA_SOURCE_TOPIC}',
              'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
              'properties.group.id' = 'testgroup12',
              'properties.sasl.mechanism' = 'PLAIN',
              'properties.security.protocol' = 'SASL_PLAINTEXT',
              'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{KAFKA_USERNAME}\" password=\"{KAFKA_PASSWORD}\";',
              'scan.startup.mode' = 'latest-offset',
              'format' = 'json'
            )
            """

    sink_ddl = f"""
            CREATE TABLE sink_table(
                Cylinders INT,
                Displacement INT,
                Horsepower INT,
                Weight INT,
                Acceleration INT,
                Model_Year INT,
                USA INT,
                Europe INT,
                Japan INT
            ) WITH (
              'connector' = 'kafka',
              'topic' = '{KAFKA_SINK_TOPIC}',
              'properties.bootstrap.servers' = '{KAFKA_SERVERS}',
              'properties.group.id' = 'testgroup12',
              'properties.sasl.mechanism' = 'PLAIN',
              'properties.security.protocol' = 'SASL_PLAINTEXT',
              'properties.sasl.jaas.config' = 'org.apache.kafka.common.security.plain.PlainLoginModule required username=\"{KAFKA_USERNAME}\" password=\"{KAFKA_PASSWORD}\";',
              'scan.startup.mode' = 'latest-offset',
              'format' = 'json'
            )
            """

    t_env.execute_sql(source_ddl)
    t_env.execute_sql(sink_ddl)

    t_env.sql_query("SELECT * FROM source_table").execute_insert("sink_table").wait()

    t_env.execute("kafka-table")


if __name__ == '__main__':
    log_processing()

By adding this job from the cli, there is no response or indication that a job in instantiated with a respective job id:

cli

Respectively no job created when viewing flink UI

If I'm incorrectly configuring the connection, can someone please correct me, or point me to a relative source of documentation (I've googled quite a bit...)


Solution

  • Found the problem, as suggested by @DavidAnderson. The code from my question works as is... just required to update the dependency jars respectively. If using Scala 2.12 and flink version 1.14, the following dependencies are applicable (with the jar dependencies downloaded and available on you jobManager in the respective directory):

    env.add_jars("file:///opt/flink/lib_py/kafka-clients-2.4.1.jar")

    env.add_jars("file:///opt/flink/lib_py/flink-connector-kafka_2.12-1.14.0.jar")

    env.add_jars("file:///opt/flink/lib_py/flink-sql-connector-kafka_2.12-1.14.0.jar")

    A useful site to reference, which I found later on is https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka_2.12/1.14.0