Search code examples
apache-flinkflink-streamingpyflink

pyflink use kafka_source_ddl an error occurred


question:

  • i want to use pyflink read kafka msg
  • run commad./bin/flink run -m 10.0.24.13:8081 -py /usr/local/project/cdn_flink/cdn_demo.py ,show error:
 File "/usr/local/flink-1.14.2/opt/python/pyflink.zip/pyflink/table/table.py", line 1108, in execute_insert
  File "/usr/local/flink-1.14.2/opt/python/py4j-0.10.8.1-src.zip/py4j/java_gateway.py", line 1286, in __call__
  File "/usr/local/flink-1.14.2/opt/python/pyflink.zip/pyflink/util/exceptions.py", line 158, in deco
pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: findAndCreateTableSink failed.
        at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:88)
        at org.apache.flink.table.factories.TableFactoryUtil.lambda$findAndCreateTableSink$0(TableFactoryUtil.java:116)
        at java.util.Optional.orElseGet(Optional.java:267)
        at org.apache.flink.table.factories.TableFactoryUtil.findAndCreateTableSink(TableFactoryUtil.java:116)
        at org.apache.flink.table.planner.delegation.PlannerBase.getTableSink(PlannerBase.scala:379)
        at org.apache.flink.table.planner.delegation.PlannerBase.translateToRel(PlannerBase.scala:222)
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:182)
        at org.apache.flink.table.planner.delegation.PlannerBase$$anonfun$1.apply(PlannerBase.scala:182)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
        at scala.collection.Iterator$class.foreach(Iterator.scala:891)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
        at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
        at scala.collection.AbstractTraversable.map(Traversable.scala:104)
        at org.apache.flink.table.planner.delegation.PlannerBase.translate(PlannerBase.scala:182)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.translate(TableEnvironmentImpl.java:1665)
        at org.apache.flink.table.api.internal.TableEnvironmentImpl.executeInternal(TableEnvironmentImpl.java:752)
        at org.apache.flink.table.api.internal.TableImpl.executeInsert(TableImpl.java:574)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.flink.api.python.shaded.py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at org.apache.flink.api.python.shaded.py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at org.apache.flink.api.python.shaded.py4j.Gateway.invoke(Gateway.java:282)
        at org.apache.flink.api.python.shaded.py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at org.apache.flink.api.python.shaded.py4j.commands.CallCommand.execute(CallCommand.java:79)
        at org.apache.flink.api.python.shaded.py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.flink.table.api.TableException: Could not load service provider for table factories.
        at org.apache.flink.table.factories.TableFactoryService.discoverFactories(TableFactoryService.java:212)

my env

  • flink-1.14.2

  • kafka_2.12-3.0.0

  • zookeeper-3.7.0

  • apache-flink 1.14.2

  • python 3.6.8

  • all process is running:

    • 7600 StandaloneSessionClusterEntrypoint
    • 13316 TaskManagerRunner # flink
    • 23878 QuorumPeerMain # zk
    • 15705 ConsoleProducer
    • 29721 Jps
    • 31454 Kafka

my code

  • cdn_connector_ddl.py
# --coding=utf8 --
kafka_source_ddl = """
CREATE TABLE cdn_access_log (
 uuid VARCHAR,
 client_ip VARCHAR,
 request_time BIGINT,
 response_size BIGINT
) WITH (
 'connector' = 'kafka',
 'topic' = 'cdn_access_log',
 'properties.bootstrap.servers' = '10.0.24.13:9091',
 'scan.startup.mode' = 'earliest-offset',
 'format' = 'csv',
 'csv.field-delimiter' = ','
)
"""
mysql_sink_ddl = """
CREATE TABLE cdn_access_statistic (
 province VARCHAR,
 access_count BIGINT,
 total_download BIGINT,
 download_speed DOUBLE
 ) WITH (
 'connector.type' = 'jdbc',
 'connector.url' = 'jdbc:mysql://localhost:3306/hive?autoReconnect=true&failOverReadOnly=false&useUnicode=true&characterEncoding=utf-8&useSSL=false&serverTimezone=GMT%2B8',
 'connector.table' = 'cdn_access_statistic',
 'connector.username' = 'hive',
 'connector.password' = 'hive1234',
 'connector.write.flush.interval' = '1s'
)
  • cdn_demo.py

# --coding=utf8 --
import os
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment, EnvironmentSettings, TableConfig
from cdn_connector_ddl import kafka_source_ddl, mysql_sink_ddl


def start():
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    # add jar
    _path = "file:////usr/local/lib64/python3.6/site-packages/pyflink/lib"
    env.add_jars(os.path.join(_path, 'mysql-connector-java-8.0.27.jar'))
    env.add_jars(os.path.join(_path, 'flink-sql-connector-kafka_2.12-1.14.2.jar'))
    env.add_jars(os.path.join(_path, 'kafka-clients-3.0.0.jar'))
    env.add_jars(os.path.join(_path, 'flink-csv-1.14.2-sql-jar.jar'))
    env.add_jars(os.path.join(_path, 'flink-connector-kafka_2.12-1.14.2.jar'))
    # t_env = StreamTableEnvironment.create(env, TableConfig())
    t_env = StreamTableEnvironment.create(
        env,
        environment_settings=EnvironmentSettings.new_instance().use_blink_planner().build())
    # set source table
    t_env.execute_sql(kafka_source_ddl)
    t_env.execute_sql(mysql_sink_ddl)
   
    t_env.from_path("cdn_access_log") \
        .select("uuid, "
            "client_ip as province, "
            "response_size, request_time")\
    .group_by("province")\
        .select(
               "province, count(uuid) as access_count, "
               "sum(response_size) as total_download,  "
               "sum(response_size) * 1.0 / sum(request_time) as download_speed") \
       .execute_insert("cdn_access_statistic")

    t_env.execute("cdn_access_log")

if __name__=='__main__':
    start()
  • i don't know how to solve,maybe use old flink version? pls help me,thanks

Solution

  • The error shows that it can't find suitable table sink

    pyflink.util.exceptions.TableException: org.apache.flink.table.api.TableException: findAndCreateTableSink failed.
    

    Two ideas for reference

    1. Check if flink-connector-jdbc.jar is loaded,I see you just loaded mysql-connector-java-8.0.27.jar
    2. Check jdbc connector option, don't use connector.xxx, may be you can reference to https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/table/jdbc/