Search code examples

Could not find any factory for identifier 'kinesis' that implements 'org.apache.flink.table.f actories.DynamicTableFactory' in the classpath

I am trying to run a pyhon job using apache flink 1.15.2 as per the repo

The repo lists 4 steps I am stuck in first step which is 1) Local development using Pyflink

I have done all the pre-requisites , like installing conda virtual env and installing the required flink version that is apache flink 1.15.2

Following the steps of the

I have also download and saved the .jar file (kinesis connector in subdir lib of current directory)

  1. There is an input stream generator code: that runs fine
import datetime
import json
import random
import boto3

STREAM_NAME = "ExampleInputStream"
STREAM_REGION = "eu-west-2"

def get_data():
    return {
        'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']),
        'price': round(random.random() * 100, 2)    }

def generate(stream_name, kinesis_client):
    while True:
        data = get_data()

if __name__ == '__main__':
    generate(STREAM_NAME, boto3.client('kinesis', region_name=STREAM_REGION))

  1. The issue is while running the Output stream code :
# -*- coding: utf-8 -*-
This module:
    1. Creates a table environment
    2. Creates a source table from a Kinesis Data Stream
    3. Creates a sink table writing to a Kinesis Data Stream
    4. Inserts the source table data into the sink table

from pyflink.table import EnvironmentSettings, TableEnvironment
import os
import json

# 1. Creates a Table Environment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

statement_set = table_env.create_statement_set()
# print(os.environ.get("IS_LOCAL"))
APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json"  # on kda

is_local = (
    True if os.environ.get("IS_LOCAL") else False
)  # set this env var in your local environment

if is_local:
    # only for local, overwrite variable to properties and pass in your jars delimited by a semicolon (;)
    APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json"  # local

    CURRENT_DIR = os.path.dirname(os.path.realpath(__file__))
        "file:///" + CURRENT_DIR + "/lib/flink-sql-connector-kinesis-1.15.2.jar",
        # "file:///" + "C:/lib/flink-sql-connector-kinesis-1.15.2.jar",

def get_application_properties():
        with open(APPLICATION_PROPERTIES_FILE_PATH, "r") as file:
            contents =
            properties = json.loads(contents)
            return properties
        print('A file at "{}" was not found'.format(APPLICATION_PROPERTIES_FILE_PATH))

def property_map(props, property_group_id):
    for prop in props:
        if prop["PropertyGroupId"] == property_group_id:
            return prop["PropertyMap"]

def create_source_table(table_name, stream_name, region, stream_initpos):
    return """ CREATE TABLE {0} (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              PARTITIONED BY (ticker)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                '' = '{3}',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(
        table_name, stream_name, region, stream_initpos

def create_sink_table(table_name, stream_name, region, stream_initpos):
    return """ CREATE TABLE {0} (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              PARTITIONED BY (ticker)
              WITH (
                'connector' = 'kinesis',
                'stream' = '{1}',
                'aws.region' = '{2}',
                'sink.partitioner-field-delimiter' = ';',
                'sink.batch.max-size' = '100',
                'format' = 'json',
                'json.timestamp-format.standard' = 'ISO-8601'
              ) """.format(
        table_name, stream_name, region

def create_print_table(table_name, stream_name, region, stream_initpos):
    return """ CREATE TABLE {0} (
                ticker VARCHAR(6),
                price DOUBLE,
                event_time TIMESTAMP(3),
                WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND

              WITH (
                'connector' = 'print'
              ) """.format(
        table_name, stream_name, region, stream_initpos

def main():
    # Application Property Keys
    input_property_group_key = "consumer.config.0"
    producer_property_group_key = "producer.config.0"

    input_stream_key = ""
    input_region_key = "aws.region"
    input_starting_position_key = ""

    output_stream_key = ""
    output_region_key = "aws.region"

    # tables
    input_table_name = "input_table"
    output_table_name = "output_table"

    # get application properties
    props = get_application_properties()

    input_property_map = property_map(props, input_property_group_key)
    output_property_map = property_map(props, producer_property_group_key)

    input_stream = input_property_map[input_stream_key]
    input_region = input_property_map[input_region_key]
    stream_initpos = input_property_map[input_starting_position_key]

    output_stream = output_property_map[output_stream_key]
    output_region = output_property_map[output_region_key]

    # 2. Creates a source table from a Kinesis Data Stream
        create_source_table(input_table_name, input_stream, input_region, stream_initpos)

    # 3. Creates a sink table writing to a Kinesis Data Stream
        create_print_table(output_table_name, output_stream, output_region, stream_initpos)

    # 4. Inserts the source table data into the sink table
    table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
                                         .format(output_table_name, input_table_name))

    if is_local:
        # get job status through TableResult

if __name__ == "__main__":

I run into the error Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kinesis' that implements 'org.apache.flink.table.f actories.DynamicTableFactory' in the classpath.

see full strack trace

  File ".\", line 181, in <module>
  File ".\", line 170, in main
    table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}"
  File "C:\Users\a856434\AppData\Local\anaconda3\envs\my-aws-flink-env\lib\site-packages\pyflink\table\", line 828, in execute_sql  
    return TableResult(self._j_tenv.executeSql(stmt))
  File "C:\Users\a856434\AppData\Local\anaconda3\envs\my-aws-flink-env\lib\site-packages\py4j\", line 1321, in __call__
    return_value = get_return_value(
  File "C:\Users\a856434\AppData\Local\anaconda3\envs\my-aws-flink-env\lib\site-packages\pyflink\util\", line 146, in deco
    return f(*a, **kw)
  File "C:\Users\a856434\AppData\Local\anaconda3\envs\my-aws-flink-env\lib\site-packages\py4j\", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o1.executeSql.
: org.apache.flink.table.api.ValidationException: Unable to create a source for reading table 'default_catalog.default_database.input_table'.

Table options are:

'stream'='ExampleInputStream '
        at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(
        at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(
        at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.createDynamicTableSource(
        at org.apache.flink.table.planner.plan.schema.CatalogSourceTable.toRel(
        at org.apache.calcite.sql2rel.SqlToRelConverter.toRel(
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertIdentifier(
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertFrom(
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelectImpl(
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertSelect(
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQueryRecursive(
        at org.apache.calcite.sql2rel.SqlToRelConverter.convertQuery(
        at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.rel(FlinkPlannerImpl.scala:189)
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.toQueryOperation(
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlQuery(
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNodeOrFail(
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertSqlInsert(
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convertValidatedSqlNode(
        at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(
        at org.apache.flink.table.planner.delegation.ParserImpl.parse(
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeSql(
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(
        at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(
        at java.base/java.lang.reflect.Method.invoke(
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(
        at java.base/
Caused by: org.apache.flink.table.api.ValidationException: Cannot discover a connector using option: 'connector'='kinesis'
        at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(
        at org.apache.flink.table.factories.FactoryUtil.discoverTableFactory(
        at org.apache.flink.table.factories.FactoryUtil.createDynamicTableSource(
        ... 34 more
Caused by: org.apache.flink.table.api.ValidationException: Could not find any factory for identifier 'kinesis' that implements 'org.apache.flink.table.f
actories.DynamicTableFactory' in the classpath.

Available factory identifiers are:

        at org.apache.flink.table.factories.FactoryUtil.discoverFactory(
        at org.apache.flink.table.factories.FactoryUtil.enrichNoMatchingConnectorError(
        ... 36 more

Here is the application.json file

  "PropertyGroupId": "",
  "PropertyMap": {
    "python": "GettingStarted/",
    "jarfile": "GettingStarted/lib/flink-sql-connector-kinesis-1.15.2.jar"
    "PropertyGroupId": "consumer.config.0",
    "PropertyMap": {
      "": "ExampleInputStream ",
      "": "LATEST",
      "aws.region": "eu-west-2"
    "PropertyGroupId": "producer.config.0",
    "PropertyMap": {
      "": "ExampleOutputStream",
      "shard.count": "1",
      "aws.region": "eu-west-2"

I was expecting the output stream to show some results like the image output stream

I rechecked the path of the jar file it exists in the path specified. I even tried testing using python sample program to see a) if the dir exists b) if the .jar file is accessible in that dir dir_structure_files

But still running into the connector issue. I rechecked the pyflink documentation for the error and how .jar dependencies need to be specified, which is exactly how it is specified in the program.

I am not using a fat jar as it is just one jar file and it should work as per the steps in repo.

Any help would be much appreciated . As to why I am using version 1.15.2 is because eventually we would want to write beam applications (currently not supported beyond 1.15.2)


  • Issue resolved. There was no problem in the code. its just that pycharm for some reason doesnt resolve the "is_local" variable correctly. so it cant look at the correct place for the .jar file. So I set the IS_LOCAL=true in the windows cmd. ran the code from there(after activiating conda) and am getting the desired output there.