Search code examples
javasocketsapache-flinkflink-streaming

How to programmatically write text to Flink socket?


My goal is to send the string "SUCCESS" to a socket and have Apache Flink's DataStream pick up that string and update a local text file with the word "SUCCESS" I have already set up a "Consumer DataStream" which monitors text being sent from the terminal to port 9999 on my localhost. Please refer to code below for the working Consumer DataStream code:

package p1;

import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.core.fs.FileSystem.WriteMode;

public class TestClientStreaming {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> textStream = env.socketTextStream("localhost", 9999);

        SingleOutputStreamOperator<String> filteredStream = textStream.filter(new FilterFunction<String>() {
            public boolean filter(String value) throws Exception {
                return value.equals("SUCCESS");
            }
        });

        filteredStream.writeAsText(<FILE-PATH>, WriteMode.OVERWRITE);

        env.execute("Reading Flink Stream");
    }
}

This works for me; When I run "nc -l 9999" on my Terminal, the "Consumer Text File" found at FILE-PATH only gets updated if I type "SUCCESS" in the Terminal and press the Enter key. Great so far!

Now, I'd like to avoid using the Terminal but still write text to this socket so my "Consumer DataStream" can pick it up. This text can be any text, as my "Consumer DataStream" will filter out the text for what it wants aka "SUCCESS".

The only two ways I know how to write to a socket are using the terminal (as discussed earlier) or using a Producer DataStream which continually reads from a "Producer Text File" and calls DataStream.writeToSocket() to write the text it reads to the socket. My "Consumer Datastream" would then pick this up, and behave as intended.

Is there any other option that can be done to write a text to socket so that a DataStream can pick this text up? I tried using the socket library to write "SUCCESS" to localhost:9999 but to no avail.

This image might help with visualizing what I'm looking to solve, and what I've already solved: https://ibb.co/41GzNWM

Thank you for your time!


Solution

  • You need to create an equivalent of nc -l aka socket server.

    A sample code would be:

    public class SocketWriter {
    
        public static void main(String[] args) {
            int port = 9999;
            boolean stop = false;
            ServerSocket serverSocket = null;
    
    
            try {
    
                serverSocket = new ServerSocket(port);
    
                while (!stop) {
                    Socket echoSocket = serverSocket.accept();
                    PrintWriter out =
                            new PrintWriter(echoSocket.getOutputStream(), true);
                    BufferedReader in =
                            new BufferedReader(
                                    new InputStreamReader(echoSocket.getInputStream()));
    
                    out.println("Hello Amazing");
                    // do whatever logic you want.
    
                    in.close();
                    out.close();
                    echoSocket.close();
    
                }
            }
            catch (IOException e) {
                e.printStackTrace();
            }
            finally {
                try {
                    serverSocket.close();
                }
                catch (IOException e) {
                    e.printStackTrace();
                }
            }
    
        }
    }