Search code examples

Can't connect Pyflink source to AWS Kinesis

I'm using Pyflink and trying to use AWS Kinesis as a source for the Table API using the following instructions:

Using the connectors here the code errors even though I have set my region:

Caused by: java.lang.IllegalArgumentException: For FlinkKinesisConsumer AWS region ('aws.region') and/or AWS endpoint ('aws.endpoint') must be set in the config.

My code is as follows:

from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,
                           DataTypes, FormatDescriptor)

t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")
t_env.get_config().set("pipeline.jars", "file:///home/ubuntu/connectors/flink-sql-connector-kinesis-1.15.0.jar")
t_env.get_config().set("aws.region", "eu-west-2")

source_ddl = """
        CREATE TABLE source_table(
            a VARCHAR,
            b INT
        ) WITH (
          'connector' = 'kinesis',
          'stream' = 'ais_raw',
          'scan.startup.mode' = 'latest-offset',
          'format' = 'json'

t_env.sql_query("SELECT a FROM source_table")

Any help would be most appreciated.


  • The option aws.region should be set in the connector options. That is, you need defined the source as following:

    source_ddl = """
            CREATE TABLE source_table(
                a VARCHAR,
                b INT
            ) WITH (
              'connector' = 'kinesis',
              'stream' = 'ais_raw',
              'scan.startup.mode' = 'latest-offset',
              'format' = 'json',
              'aws.region' = 'eu-west-2'

    See kinesis connector documentation for more details.