Search code examples
javaapache-flinkflink-streamingflink-cep

Write data from custom source to flink in continuous way


It is the first time I am working with Apache Flink (1.3.1) and have a question. In more detail I am working with flink-core, flink-cep and flink-streaming library. My application is an Akka ActorSystem what consumes messages from RabbitMQ and various actors handle this messages. In some actors I want to instantiate a StreamExecutionEnvironment from Flink and process the incoming messages. Therefore I wrote a custom source class what extends the RichSourceFunction class. Everything works find, except one thing: I do not know how to send data to my Flink extension. Here is my setup:

public class FlinkExtension {

    private static StreamExecutionEnvironment environment;
    private DataStream<ValueEvent> input;
    private CustomSourceFunction function;

    public FlinkExtension(){

        environment = StreamExecutionEnvironment.getExecutionEnvironment();

        function = new CustomSourceFunction();
        input = environment.addSource(function);

        PatternStream<ValueEvent> patternStream = CEP.pattern(input, _pattern());

        DataStream<String> warnings = patternStream.select(new PatternSelectFunction<ValueEvent, String>() {
            @Override
            public String select(Map<String, List<ValueEvent>> pattern) throws Exception {
                return null; //TODO
            }
        });

        warnings.print();

        try {
            environment.execute();
        } catch(Exception e){
            e.printStackTrace();
        }

    }

    private Pattern<ValueEvent, ?> _pattern(){

        return Pattern.<ValueEvent>begin("first").where(new SimpleCondition<ValueEvent>() {
            @Override
            public boolean filter(ValueEvent value) throws Exception {
                return value.getValue() > 10;
            }
        });
    }

    public void sendData(ValueEvent value){
        function.sendData(value);
    }
}

And this is my custom source function:

public class CustomSourceFunction extends RichSourceFunction<ValueEvent> {

    private volatile boolean run = false;
    private SourceContext<ValueEvent> context;

    @Override
    public void open(Configuration parameters){
        run = true;
    }

    @Override
    public void run(SourceContext<ValueEvent> ctx) throws Exception {
        this.context = ctx;

        while (run){

        }
    }

    public void sendData(ValueEvent value){
        this.context.collectWithTimestamp(value, Calendar.getInstance().getTimeInMillis());
    }

    @Override
    public void cancel() {
        run = false;
    }
}

So I want to call the method sendData in my FlinkExtension class from outside to write data in a continuous way to my FlinkExtension. Here is my JUnit test what should send data to the extension and then write the data to the SourceContext.

@Test
public void testSendData(){
    FlinkExtension extension = new FlinkExtension();
    extension.sendData(new ValueEvent(30));
}

But if I run the test, nothing happens, the application hangs in the run method of the CustomSourceFunction. I also tried to create a new endless thread in the CustomSourceFunction run method.

To summarize: Does anybody know how to write data from an application to a Flink instance in a continuous way?


Solution

  • Flink source connectors emit a continuous stream of data by having their run() methods call collect() (or collectWithTimestamp()) inside of the while(run) loop. If you want to study an example, the Apache NiFi source isn't as complex as most; here's its run method.