Search code examples
apache-flinkflink-streaming

Apache Flink's Iterative Stream with Asynchronous operations doesn't work well


I use flink (1.11.1) to request information to an external sources. I have 2 differents pipelines that share 80% code, i have 1st job that is an ETL it start and finish, and the 2nd is to process webhooks in real time and is always running. In the 2nd pipeline i use kafka as source to consume and process the changes in the external source , and i have an step that i use the Iterate transformation with an Async I/O operator that not work well.

After some time consuming many messages in kafka, the iterative starts having problems, it doesn't iterate but it doesn't close the iterator either. The kafka consumer continues to consume messages and the elements continue to be transmitted through the pipeline until the iterative.

Here i share the code

DataStream<DataContainer<ConnectionWebhook>> connections = env.addSource(getKafkaConsumer(properties)).setParallelism(1)
                .map(new StringKafkaMessageMap()).name("StringKafkaMessageMap")
                .map(new KafkaMessageConnectionMap()).name("KafkaMessageConnectionMap");

        DataStream<DataContainer<ConnectionWebhook>> verifyConnection = AsyncDataStream.unorderedWait(connections, new VerifyConnection(), 30000, TimeUnit.MILLISECONDS, 1).name("VerifyConnection");

        DataStream<DataContainer<ConnectionWebhook>> connectionSuccessfully = verifyConnection.filter(new FilterConnectionWithoutError()).name("FilterConnectionWithoutError");

        DataStream<DataContainer<ConnectionWebhook>> connectionUnsuccessfully = verifyConnection.filter(new FilterConnectionWithError()).name("FilterConnectionWithError");
        DataStream<DataContainer<Tuple2<ConnectionWebhook, Map<String, Object>>>> connectionUnsuccessfullyError = connectionUnsuccessfully.map(new connectionUnsuccessfullyMap()).name("connectionUnsuccessfullyMap");


        DataStream<DataContainer<Tuple4<Integer, Integer, ConnectionWebhook, Map<String, Object>>>> initialCustomFieldRequest = connectionSuccessfully.map(new InitialCustomFieldMap()).name("InitialCustomFieldMap");

        IterativeStream<DataContainer<Tuple4<Integer, Integer, ConnectionWebhook, Map<String, Object>>>> iterativeCustomField = initialCustomFieldRequest.iterate();
        DataStream<DataContainer<Tuple4<Integer, Integer, ConnectionWebhook, Map<String, Object>>>> customField = AsyncDataStream.unorderedWait(iterativeCustomField, new AsyncCustomField(), 30000, TimeUnit.MILLISECONDS, 1).name("AsyncCustomField");
        DataStream<DataContainer<Tuple4<Integer, Integer, ConnectionWebhook, Map<String, Object>>>> withPendingCustomFields = customField.filter(new WithPendingCustomFields()).name("WithPendingCustomFields");
        DataStream<DataContainer<Tuple4<Integer, Integer, ConnectionWebhook, Map<String, Object>>>> withoutPendingCustomFields = customField.filter(new WithoutPendingCustomFields()).name("WithoutPendingCustomFields");
        iterativeCustomField.closeWith(withPendingCustomFields);

        DataStream<DataContainer<Tuple2<ConnectionWebhook, Map<String, Object>>>> initialIssueRetrieval = initialCustomFieldRequest.map(new InitialIssueRetrieval()).name("InitialIssueRetrieval");

Solution

  • One possible issue with iterations is that once there's backpressure in the iteration loop, it can cause gridlock because records from the end of the iteration can't be sent to the head of the iteration, and the head of the iteration isn't able to receive new records until the iteration has processed records.

    Normally you'd only encounter this situation if either (a) your iteration is generating more than one record for each record coming into the head, or (b) records are looping many times, and thus the combination of N previous records exceeds network buffer capacity.

    A short term fix is to bump the size of your network buffer, but that doesn't solve the underlying issue. We did some hacks (in a DataSet-based iteration) to throttle incoming records (create backpressure to the source, before the iteration), but it was pretty skanky.