Search code examples
javasocketscassandraapache-flink

Using a cassandra database query as the source for a Flink program


I have a Cassandra database that have to receive its data in my Flink program from socket like steam for Streamprocessing. So, I wrote a simple client program that read data from Cassandra and sent the data to the socket;also,I wrote the Flink program in server base.In fact, my client program is simple and does not use any Flink instructions;it just send a Cassandra row in string format to socket and Server must receive the row. First, I run the Flink program to listen to the client and then run the client program. The client received this stream from server (because server send datastream data and client cannot receive it correctly):

Hi Client org.apache.flink.streaming.api.datastream.DataStreamSource@68c72235

After that both programs stay running without sending and receiving any data and there is no error.

The Flink program is in following: public class WordCount_in_cassandra {

 private static int myport=9999;
 private static String hostname="localhost";
 //static ServerSocket variable
 private static ServerSocket server;
 private static int count_row=0;

 public static void main(String[] args) throws Exception {
 // Checking input parameters
 final ParameterTool params = ParameterTool.fromArgs(args);
 // set up the execution environment
 final StreamExecutionEnvironment env = 
 StreamExecutionEnvironment.getExecutionEnvironment();

 //create the socket server object
    server = new ServerSocket(myport);
 // make parameters available in the web interface
    env.getConfig().setGlobalJobParameters(params);

    while (true){
        System.out.println("Waiting for client request");
        //creating socket and waiting for client connection
        Socket socket = server.accept();
        DataStream<String> stream = env.socketTextStream(hostname, 
        myport);

        stream.print();

        //write object to Socket
        oos.writeObject("Hi Client " + stream.toString());
        oos.close();
        socket.close();

        // parse the data, group it, window it, and aggregate the 
        counts
    DataStream<Tuple2<String, Long>> counts = stream
                .flatMap(new FlatMapFunction<String, Tuple2<String, 
    Long>>() {
                    @Override
            public void flatMap(String value, 
     Collector<Tuple2<String, Long>> out) {
                        // normalize and split the line
           String[] words = value.toLowerCase().split("\\W+");

                        // emit the pairs
             for (String word : words) {

                if (!word.isEmpty()) {
                   out.collect(new Tuple2<String, Long>(word, 1L));
                            }
                        }
                    }
                })
                .keyBy(0)
                .timeWindow(Time.seconds(5))
                .sum(1);

        // emit result
        if (params.has("output")) {
            counts.writeAsText(params.get("output"));
        } else {
            System.out.println("Printing result to stdout. Use -- 
            output to specify output path.");

            counts.print();
        }

        //terminate the server if client sends exit request
        if (stream.equals("exit")){
            System.out.println("row_count : "+count_row);
            break;
        }

        // execute program
        env.execute("Streaming WordCount");
    }//while true
    System.out.println("Shutting down Socket server!!");
    server.close();
     }//main
   }

The client program is like this:

public class client_code {
private static Cluster cluster = 
  Cluster.builder().addContactPoint("127.0.0.1")
 .withPort(9042).build();
private static Session session = cluster.connect("mar1");

 public static void main(String[] args) throws UnknownHostException, 
   IOException, ClassNotFoundException, InterruptedException {
    String serverIP = "localhost";
    int port=9999;
    Socket socket = null;
    ObjectOutputStream oos = null;
    ObjectInputStream ois = null;

    ResultSet result = session.execute("select * from tlbtest15");
    for (Row row : result) {
        //establish socket connection to server
        socket = new Socket(serverIP, port);
        //write to socket using ObjectOutputStream
        oos = new ObjectOutputStream(socket.getOutputStream());
        System.out.println("Sending request to Socket Server");

        if (row==result) oos.writeObject("exit");
        else oos.writeObject(""+row+"");
        //read the server response message
        ois = new ObjectInputStream(socket.getInputStream());
        String message = (String) ois.readObject();
        System.out.println("Message: " + message);
        //close resources
        ois.close();
        oos.close();
        Thread.sleep(100);
    }

    cluster.close();
 }
}

Would you please tell me how I can solve my problem?

Any help would be appreciated.


Solution

  • There are several problems with the way you've tried to construct the Flink application. A few comments:

    • The Flink DataStream API is used to describe a dataflow graph that is sent to a cluster for execution when env.execute() is called. It doesn't make sense to wrap this in a while(true) loop.
    • socketTextStream sets up a client connection. Your server doesn't appear to do anything useful.
    • stream.equals("exit") -- stream is a DataStream, not a String. If you want to do something special when a stream element has a specific value, that needs to be done differently, by using one of the stream operations that does event-at-a-time processing. As for shutting down the Flink job, streaming jobs are normally designed to either run indefinitely, or to run until a finite input source reaches its end, at which point they shutdown on their own.

    You can simplify things considerably. I would start over, and begin by replacing your client with a command line like this:

    cqlsh -e "SELECT * from tlbtest15;" | nc -lk 9999
    

    nc (netcat) will act as a server in this case, allowing Flink to be a client. This will make things easier, as that's how env.socketTextTream is meant to be used.

    Then you'll be able to process the results with a normal Flink application. The socketTextStream will produce a stream containing the query's results as lines of text, one for each row.