Search code examples
apache-kafkaapache-flinkflink-streaming

Flink kafka to S3 with retry


I am a new starter in Flink, I have a requirement to read data from Kafka, enrich those data conditionally (if a record belongs to category X) by using some API and write to S3.

I made a hello world Flink application with the above logic which works like a charm.

But, the API which I am using to enrich doesn't have 100% uptime SLA, so I need to design something with retry logic.

Following are the options that I found,

Option 1) Make an exponential retry until I get a response from API, but this will block the queue, so I don't like this

Option 2) Use one more topic (called topic-failure) and publish it to topic-failure if the API is down. In this way it won't block the actual main queue. I will need one more worker to process the data from the queue topic-failure. Again, this queue has to be used as a circular queue if the API is down for a long time. For example, read a message from queue topic-failure try to enrich if it fails to push to the same queue called topic-failure and consume the next message from the queue topic-failure.

I prefer option 2, but it looks like not an easy task to accomplish this. Is there is any standard Flink approach available to implement option 2?


Solution

  • This is a rather common problem that occurs when migrating away from microservices. The proper solution would be to have the lookup data also in Kafka or some DB that could be integrated in the same Flink application as an additional source.

    If you cannot do it (for example, API is external or data cannot be mapped easily to a data storage), both approaches are viable and they have different advantages.

    1) Will allow you to retain the order of input events. If your downstream application expects orderness, then you need to retry.

    2) The common term is dead letter queue (although more often used on invalid records). There are two easy ways to integrate that in Flink, either have a separate source or use a topic pattern/list with one source.

    Your topology would look like this:

    Kafka Source      -\             Async IO        /-> Filter good -> S3 sink
                        +-> Union -> with timeout  -+ 
    Kafka Source dead -/           (for API call!)   \-> Filter bad  -> Kafka sink dead