Search code examples

Flink - Can't convert Table to DataStream

I've managed to use the Pyflink table API to connect to Kinesis and process a stream of data. I'm now trying to convert this table to a DataStream as I need more low level processing.

I've tried following the example here but I'm getting the error:

AttributeError: 'TableEnvironment' object has no attribute 'to_data_stream'

I know it seems obvious but comparing my code to the example I can't see what I'm missing.

The code I've used is:

from pyflink.table import (EnvironmentSettings, TableEnvironment, StreamTableEnvironment, TableDescriptor, Schema,
                           DataTypes, FormatDescriptor, AggregateFunction)
from pyflink.datastream import StreamExecutionEnvironment

t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
t_env.get_config().set("parallelism.default", "1")
t_env.get_config().set("pipeline.jars", "file:///home/ubuntu/connectors/flink-sql-connector-kinesis-1.15.0.jar")

#Create stream env
env = StreamExecutionEnvironment.get_execution_environment()

# Create sources
source_ddl = """
        CREATE TABLE source(
            decoded_timestamp TIMESTAMP(3),
            lon DOUBLE,
            lat DOUBLE,
            WATERMARK FOR decoded_timestamp AS decoded_timestamp
        ) WITH (
          'connector' = 'kinesis',
          'stream' = 'flink_formatted',
          'scan.startup.mode' = 'latest-offset',
          'format' = 'json',
          'aws.region' = 'eu-west-1'

# Trigger execution of job
tab = t_env.from_path('source')

ds = t_env.to_data_stream(tab)



  • Try reorganizing the initialization like this:

    env = StreamExecutionEnvironment.get_execution_environment()
    t_env = StreamTableEnvironment.create(stream_execution_environment=env)
    t_env.get_config().set("pipeline.jars", "file:///home/ubuntu/connectors/flink-sql-connector-kinesis-1.15.0.jar")