Search code examples
apache-kafkaconnectionstreamingapache-flinkflink-streaming

How do we connect AMPS[CRANK UP THE AMPS] Server and Apache Flink for Real time stream?


We are subscribing the real time data from AMPS[CRANK UP THE AMPS] Server as source of Apache flink. Any idea about about how to connect both of them like kafka.

Amps Server : http://www.crankuptheamps.com/amps/


Solution

  • Currently, Apache Flink doesn't provide any out of the box connector for AMPS as you could see here. But, it does provide an extensible Source/Sink interface that can be used to tap in any custom source/sink.

    You can create your own AMPS source connector by extending RichSourceFunction and passing it to addSource method as mentioned in this flink documentation. Refer the Java Client API provided by crankuptheamps for connecting to the source topic and subscribe for messages.

    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    import com.crankuptheamps.client.Client;
    import com.crankuptheamps.client.Message;
    
    public class AMPSSource extends RichSourceFunction<String> {
    
    
        private static final long serialVersionUID = -8708182052610791593L;
        private String name, topic, connectionString;
        private Client client;
    
        public AMPSSource(String name, String connectionString, String topic) {
            this.name = name;
            this.topic = topic;
            this.connectionString = connectionString;
        }
    
        @Override
        public void open(Configuration parameters) throws Exception {
            // We create a Client, then connect() and logon()
            client = new Client(this.name);
            client.connect(this.connectionString);
            client.logon();
        }
    
        public void run(SourceContext<String> sourceContext) throws Exception {
            /*
             * Here, we iterate over messages in the MessageStream returned by
             * subscribe method
             */
            for (Message message : client.subscribe(this.topic)) {
                sourceContext.collect(message.getData());
            }
        }
    
        @Override
        public void close() throws Exception {
            try {
                cancel();
            } finally {
                super.close();
            }
        }
    
        public void cancel() {
            client.close();
        }
    
    }
    

    This can be used as a source in your Processor as follows,

    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    public class StreamProcessor {
    
        public static void main(String[] args) throws Exception {
    
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<String> ampsStream = env
                    .addSource(new AMPSSource("flink-consumer", "tcp://127.0.0.1:9007/amps/json", "test-topic"));
    
            ampsStream.print();
            env.execute();
        }
    }
    

    Note: The RichSourceFunction implementations have a parallelism of 1. To enable parallel execution, the user defined source should implement org.apache.flink.streaming.api.functions.source.ParallelSourceFunction or extend org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction