Search code examples

setting variables in apache flink

I'm asking this question because I'm having trouble setting variables in apache flink. i would like to use a stream to fetch data with which i will initialize the variables i need for the second stream. The problem is that the streams execute in parallel, which results in a missing value when initializing the second stream. sample code:

KafkaSource<Object> mainSource1 = KafkaSource.<Object>builder()
      .setDeserializer(new ObjectDeserializer())

DataStream<Market> mainStream1 = env.fromSource(mainSource, WatermarkStrategy.forMonotonousTimestamps(), "mainSource");

// fetching data from the stream and setting variables

Map<TopicPartition, Long> endOffset = new HashMap<>();
endOffset.put(new TopicPartition("topicName", 0), offsetFromMainStream1);

KafkaSource<Object> mainSource2 = KafkaSource.<Object>builder()
      .setDeserializer(new ObjectDeserializer())

DataStream<Market> mainStream2 = env.fromSource(mainSource, WatermarkStrategy.forMonotonousTimestamps(), "mainSource");

// further stream operations

I would like to call the first stream from which I will fetch the data and set it locally then I can use it in operations on the second stream


  • You want to use one Stream's data to control another Stream's behavior. The best way is to use the Broadcast state pattern.

    This involves creating a BroadcastStream from mainStream1, and then connecting mainStream2 to mainStream1. Now mainStream2 can access the data from mainStream1.

    Here is a high level example based on your code. I am assuming that the key is String.

    // Broadcast Stream
    MapStateDescriptor<String, Market> stateDescriptor = new MapStateDescriptor<>(
                TypeInformation.of(new TypeHint<Market>() {}));
    // broadcast the rules and create the broadcast state
    BroadcastStream<Market> mainStream1BroadcastStream = mainStream1.keyBy(// key by Id).
    DataStream<Market> yourOutput = mainStream2
                        new KeyedBroadcastProcessFunction<>() {
                             // You can access mainStream1 output and mainStream2 data here.

    This concept is explained in detail here. The code is also a modified version shown here -