Search code examples
pythonapache-flinkflink-streamingflink-sqlpyflink

i'm getting this error when running the below pyflink code


this is the code for calculating average of each ch[x] from a kafka source using apache flink(pyflink) i think i have imported all of the necessary libraries

And I'm getting this error when running the code

from numpy import average
from pyflink.table import TableEnvironment, EnvironmentSettings
from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
from pyflink.table.udf import udf
from pyflink.table import *

def create_input():
    return """
        CREATE TABLE input(
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `t` TIMESTAMP_LTZ(3),
        ) WITH (
          'connector' = 'kafka',
          'topic' = 'energymeter.raw',
          'properties.bootstrap.servers' = '192.168.0.34:9092',
          'scan.startup.mode' = 'earliest-offset',
          'format' = 'json',
        )
    """
@udf(result_type=DataTypes.BIGINT())
def average_power(x):
  return x*12*2
@udf(result_type=DataTypes.BIGINT())
def energy_consumption(x):
  return x/500
def create_output():
    return """
        CREATE TABLE output (
          `gw_id` VARCHAR,
          `ch1` BIGINT,
          `ch2` BIGINT,
          `ch3` BIGINT,
          `ch4` BIGINT,
          `ch5` BIGINT,
          `ch6` BIGINT,
          `ch1_mod` BIGINT,
          `ch2_mod` BIGINT,
          `ch3_mod` BIGINT,
          `ch4_mod` BIGINT,
          `ch5_mod` BIGINT,
          `ch6_mod` BIGINT,
          `t` TIMESTAMP_LTZ(3)
        ) WITH (
          'connector' = 'kafka'
          'topic' = 'energymeter.processed',
          'properties.bootstrap.servers' = '192.168.0.34:9092',
          'format' = 'json'
        )
    """
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)
table_env.execute_sql(create_input())
table_env.execute_sql(average_power())
table_env.execute_sql(energy_consumption())
table_env.execute_sql(create_output())
table_env.execute_sql("INSERT INTO output SELECT gw_id, t, ch1, average_power(ch1), ch2, average_power(ch2), ch3, average_power(ch3), ch4, average_power(ch4), ch5, average_power(ch5), ch6, average_power(ch6) FROM input").wait()

Error is this i have added sql kafka connector flink-sql-connector-kafka_2.11-1.14.4.jar but nothing seems to work


Caused by: org.apache.calcite.sql.parser.SqlParseException: Encountered ")" at line 11, column 9.
Was expecting one of:
    "CONSTRAINT" ...
    "PRIMARY" ...
    "UNIQUE" ...
    "WATERMARK" ...
    <BRACKET_QUOTED_IDENTIFIER> ...
    <QUOTED_IDENTIFIER> ...
    <BACK_QUOTED_IDENTIFIER> ...
    <HYPHENATED_IDENTIFIER> ...
    <IDENTIFIER> ...
    <UNICODE_QUOTED_IDENTIFIER> ...
    
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.convertException(FlinkSqlParserImpl.java:472)
        at org.apache.flink.sql.parser.impl.FlinkSqlParserImpl.normalizeException(FlinkSqlParserImpl.java:235)
        at org.apache.calcite.sql.parser.SqlParser.handleException(SqlParser.java:140)
        at org.apache.calcite.sql.parser.SqlParser.parseStmtList(SqlParser.java:195)
        at org.apache.flink.table.planner.parse.CalciteParser.parseSqlList(CalciteParser.java:77)
        ... 13 more ```

Solution

  • There are many problems with your program, e.g.

    • Missing comma after 'connector' = 'kafka', extra comma after ``t TIMESTAMP_LTZ(3), and 'format' = 'json',
    • Should use create_temporary_function to register Python UDFs instead of execute_sql
    • The fields order appearing in the SELECT clause is not consistent with the sink table output definition

    I have made some modifications to it as following:

    from numpy import average
    from pyflink.table import TableEnvironment, EnvironmentSettings
    from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
    from pyflink.table import StreamTableEnvironment, DataTypes, EnvironmentSettings
    from pyflink.table.udf import udf
    from pyflink.table import *
    
    def create_input():
        return """
            CREATE TABLE input(
              `gw_id` VARCHAR,
              `ch1` BIGINT,
              `ch2` BIGINT,
              `ch3` BIGINT,
              `ch4` BIGINT,
              `ch5` BIGINT,
              `ch6` BIGINT,
              `t` TIMESTAMP_LTZ(3)
            ) WITH (
              'connector' = 'kafka',
              'topic' = 'energymeter.raw',
              'properties.bootstrap.servers' = '192.168.0.34:9092',
              'scan.startup.mode' = 'earliest-offset',
              'format' = 'json'
            )
        """
    @udf(result_type=DataTypes.BIGINT())
    def average_power(x):
      return x*12*2
    
    @udf(result_type=DataTypes.BIGINT())
    def energy_consumption(x):
      return x/500
    
    def create_output():
        return """
            CREATE TABLE output (
              `gw_id` VARCHAR,
              `ch1` BIGINT,
              `ch2` BIGINT,
              `ch3` BIGINT,
              `ch4` BIGINT,
              `ch5` BIGINT,
              `ch6` BIGINT,
              `ch1_mod` BIGINT,
              `ch2_mod` BIGINT,
              `ch3_mod` BIGINT,
              `ch4_mod` BIGINT,
              `ch5_mod` BIGINT,
              `ch6_mod` BIGINT,
              `t` TIMESTAMP_LTZ(3)
            ) WITH (
              'connector' = 'kafka',
              'topic' = 'energymeter.processed',
              'properties.bootstrap.servers' = '192.168.0.34:9092',
              'format' = 'json'
            )
        """
    env_settings = EnvironmentSettings.in_streaming_mode()
    table_env = TableEnvironment.create(env_settings)
    table_env.execute_sql(create_input())
    table_env.create_temporary_function("average_power", average_power)
    table_env.create_temporary_function("energy_consumption", energy_consumption)
    table_env.execute_sql(create_output())
    table_env.execute_sql("INSERT INTO output SELECT gw_id, ch1, ch2, ch3, ch4, ch5, ch6, average_power(ch1), average_power(ch2), average_power(ch3), average_power(ch4), average_power(ch5), average_power(ch6), t FROM input").wait()