Search code examples
apache-flinkflink-streamingflink-cep

read string datastream in Flink from socket without using netcat server


I have a case scenario in which I have a stream generator client which is generating multiple streams, merging them and sending it to socket and I want Flink program to listen to it as the server. As we know that server has to be turned up first, so that it can listen to client requests. I tried to do the same by using code given below

 public static void main(String[] args)  throws Exception {

    //setting the envrionment variable as StreamExecutionEnvironment
      StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

        environment.setParallelism(1);


        DataStream<String> stream1 = environment.socketTextStream("localhost", 9000);
        stream1.print();



                //start the execution
     environment.execute(" Started the execution ");



}// main

The code for stream generator acting as client is given below

    DataStream<Event> stream1  = envrionment
                .addSource(new EventGenerator(2,60,1,1,100, 200 ))
                .name("stream 1")
                .setParallelism(parallelism_for_stream_rr);

    DataStream<Event> stream2  = envrionment
            .addSource(new EventGenerator(3,60,1,2,10, 20 ))
            .name("stream 2")
            .setParallelism(parallelism_for_stream_rr);


    DataStream<Event> stream3  = envrionment
            .addSource(new EventGenerator(5,60,1,3,30, 40 ))
            .name("stream 3")
            .setParallelism(parallelism_for_stream_rr);


    DataStream<Event> merged = stream1.union(stream2,stream3);

    merged.print();



            // sending data to Mobile Cep via socket

            merged.map(new MapFunction<Event, String>() {

                @Override
                public String map(Event event) throws Exception {
                    String tuple = event.toString();


                    return tuple + "\n";

                }
            }).writeToSocket("localhost", 9000, new SimpleStringSchema() );

Issue # 1: The issue is that client code works only when I start a Netcat server, but then Netcat server doesn't forwards the data streams.If Netcat server is not up, client code says it cant make a connection

Issue # 2: Flink program doesn't execute if Netcat server is not up

Caused by: java.net.ConnectException: Connection refused (Connection refused)
at java.net.PlainSocketImpl.socketConnect(Native Method)
at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)

I know that one possible solution for this is to generate the streams within the Flink program, but I want to receive the streams via socket.

Thanks in Advance ~


Solution

  • Neither Flink's socket source nor its sink starts a TCP server and waits for incoming connections. They are both clients which connect against an already started TCP server. That's also why you have to start netcat before launching the jobs. If you want to write to and read from a socket, then you have to write a TCP server which can buffer the incoming data and forwards them once a client connects to it.