I want to translate the following code into pyflink and run it in pyflink-shell.sh afterwards.
public class MapDemo {
private static int index = 1;
public static void main(String[] args) throws Exception {
//1.获取执行环境配置信息
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//2.定义加载或创建数据源(source),监听9000端口的socket消息
DataStream<String> textStream = env.socketTextStream("localhost", 9000, "\n");
//3.map操作。
DataStream<String> result = textStream.map(s -> (index++) + ".您输入的是:" + s);
//4.打印输出sink
result.print();
//5.开始执行
env.execute();
}
But I can not found socketTextStream
in b_env
,bt_env
,s_env
,st_env
so where's the socketTextStream
in pyflink api?
As of Flink 1.12, out-of-the-box PyFlink appears to only support these connectors:
See https://github.com/apache/flink/blob/master/flink-python/pyflink/datastream/connectors.py.
Because socketTextStream
cannot support exactly-once semantics, its use is generally discouraged, and it wasn't included in PyFlink.