Search code examples
streamapache-flinkflink-streaming

How to use ListState for BroadcastProcessFunction in Flink


We have a non-keyed data stream which contains transactions and a broadcast stream which contains rules. In fact, we want to process transactions based on the last seen rule. If our last seen rule is daily, we have to add current transaction to dailyTrnsList. Also, if the dailyTrnsList size is greater than threshold, we must clear the list and write transactions to database. We do the same thing if the last seen rule is temp.

The code is in following:

public class TransactionProcess extends BroadcastProcessFunction<String, String, String>{
private List<String> dailyTrnsList = new ArrayList<>();
private List<String> tempTrnsList = new ArrayList<>();

private final static int threshold = 100;

private final MapStateDescriptor<String, String> ruleStateDesc =
        new MapStateDescriptor<>(
                "ControlMapState",
                BasicTypeInfo.STRING_TYPE_INFO,
                BasicTypeInfo.STRING_TYPE_INFO);

  @Override
  public void processElement(String s,
                           ReadOnlyContext readOnlyContext,
                           Collector<Transaction> collector) throws Exception
 {
    String ruleName = readOnlyContext.getBroadcastState(ruleStateDesc).get("rule");

    if(ruleName.equals("daily"))
        {
            dailyTrnsList.add(s);
            if(dailyTrnsList.size()>=threshold)
                {
                    List<String> buffer = dailyTrnsList;
                    dailyTrnsList = new ArrayList<>();
                    insert_to_db(buffer,"daily");
                }
        }
    else if(ruleName.equals("temp"))
        {
            tempTrnsList.add(s);
            if(tempTrnsList.size()>=threshold)
                {
                    List<String> buffer = tempTrnsList;
                    tempTrnsList = new ArrayList<>();
                    insert_to_db(buffer,"temp");
                }
        }

    collector.collect(s);

   }
  @Override
  public void processBroadcastElement(String s,
                                    Context context,
                                    Collector<CardTransaction> collector) throws Exception
  {
    if (s.equals("temp"))
    {
        context.getBroadcastState(ruleStateDesc).put("rule", "temp");
    List<String> buffer = dailyTrnsList;
        dailyTrnsList = new ArrayList<>();
        insert_to_db(buffer,"daily");
    }
    else if (s.equals("daily"))
    {
        context.getBroadcastState(ruleStateDesc).put("rule", "daily");
        List<String> buffer = tempTrnsList;
        tempTrnsList = new ArrayList<>();
        insert_to_db(buffer,"temp");
      }
    }
  }

Our problem is writing a fault tolerance approach. We do not know how to use ListState for our issue. The only solutioln that we have found so far is implementation of CheckpointedFunction interface which was under Working with State section in Flink document.

private ListState<String> dailyTrns;
private ListState<String> tempTrns;

@Override
public void snapshotState(FunctionSnapshotContext functionSnapshotContext) throws Exception {
    dailyTrns.clear();
    tempTrns.clear();
    for (String[] element : dailyTrnsList)
        dailyTrns.add(element);
    for (String[] element : tempTrnsList)
        tempTrns.add(element);
}

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {

    dailyTrns = context.getOperatorStateStore().getListState(dailyDescriptor);
    tempTrns = context.getOperatorStateStore().getListState(tempDescriptor);
    if (context.isRestored()) {
        for (String[] element : dailyTrns.get())
            dailyTrnsList.add(element);
        for (String[] element : tempTrns.get())
            tempTrnsList.add(element);
    }
}

Would you please guide us, if this approach is not the correct solution, what else we can do? and if the solution is correct, what happen for elements that are not transferred from dailyTrnsList and tempTrnsList to dailyTrns and tempTrns?

Any help would be appreciated.

Thank you in advance.


Solution

  • You could simplify your implementation so as to not have to worry about this. You could do the following:

    (1) Simplify the BroadcastProcessFunction so that all it does is to split the incoming stream into two streams: a stream of daily transactions and a stream of temporary transactions. It does this by choosing one of two side outputs based on the latest rule.

    (2) Follow the BroadcastProcessFunction with count windows that create batches and write them to the database.

    Or instead of using side outputs, the BroadcastProcessFunction could write out tuples of (rule, transaction), and then you could key the stream by the rule. Either way, the idea would be to let the window API take care of managing fault tolerant lists for you.