Search code examples
javaapache-flinkflink-streaming

setting variables in apache flink


I'm asking this question because I'm having trouble setting variables in apache flink. i would like to use a stream to fetch data with which i will initialize the variables i need for the second stream. The problem is that the streams execute in parallel, which results in a missing value when initializing the second stream. sample code:

KafkaSource<Object> mainSource1 = KafkaSource.<Object>builder()
      .setBootstrapServers(...)
      .setTopicPattern(Pattern.compile(...))
      .setGroupId(...)
      .setStartingOffsets(OffsetsInitializer.earliest())
      .setDeserializer(new ObjectDeserializer())
      .build();

DataStream<Market> mainStream1 = env.fromSource(mainSource, WatermarkStrategy.forMonotonousTimestamps(), "mainSource");


// fetching data from the stream and setting variables


Map<TopicPartition, Long> endOffset = new HashMap<>();
endOffset.put(new TopicPartition("topicName", 0), offsetFromMainStream1);



KafkaSource<Object> mainSource2 = KafkaSource.<Object>builder()
      .setBootstrapServers(...)
      .setTopicPattern(Pattern.compile(...))
      .setGroupId(...)
      .setStartingOffsets(OffsetsInitializer.earliest())
      .setBounded(OffsetsInitializer.offsets(endOffset))
      .setDeserializer(new ObjectDeserializer())
      .build();

DataStream<Market> mainStream2 = env.fromSource(mainSource, WatermarkStrategy.forMonotonousTimestamps(), "mainSource");

// further stream operations


I would like to call the first stream from which I will fetch the data and set it locally then I can use it in operations on the second stream


Solution

  • You want to use one Stream's data to control another Stream's behavior. The best way is to use the Broadcast state pattern.

    This involves creating a BroadcastStream from mainStream1, and then connecting mainStream2 to mainStream1. Now mainStream2 can access the data from mainStream1.

    Here is a high level example based on your code. I am assuming that the key is String.

    // Broadcast Stream
    MapStateDescriptor<String, Market> stateDescriptor = new MapStateDescriptor<>(
                "RulesBroadcastState",
                BasicTypeInfo.STRING_TYPE_INFO,
                TypeInformation.of(new TypeHint<Market>() {}));
            
    // broadcast the rules and create the broadcast state
    BroadcastStream<Market> mainStream1BroadcastStream = mainStream1.keyBy(// key by Id).
                            .broadcast(stateDescriptor);
    
    DataStream<Market> yourOutput = mainStream2
                     .connect(mainStream1BroadcastStream)
                     .process(            
                        new KeyedBroadcastProcessFunction<>() {
                             // You can access mainStream1 output and mainStream2 data here.
                         }
                     );
    

    This concept is explained in detail here. The code is also a modified version shown here - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#the-broadcast-state-pattern