I'm asking this question because I'm having trouble setting variables in apache flink. i would like to use a stream to fetch data with which i will initialize the variables i need for the second stream. The problem is that the streams execute in parallel, which results in a missing value when initializing the second stream. sample code:
KafkaSource<Object> mainSource1 = KafkaSource.<Object>builder()
.setBootstrapServers(...)
.setTopicPattern(Pattern.compile(...))
.setGroupId(...)
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(new ObjectDeserializer())
.build();
DataStream<Market> mainStream1 = env.fromSource(mainSource, WatermarkStrategy.forMonotonousTimestamps(), "mainSource");
// fetching data from the stream and setting variables
Map<TopicPartition, Long> endOffset = new HashMap<>();
endOffset.put(new TopicPartition("topicName", 0), offsetFromMainStream1);
KafkaSource<Object> mainSource2 = KafkaSource.<Object>builder()
.setBootstrapServers(...)
.setTopicPattern(Pattern.compile(...))
.setGroupId(...)
.setStartingOffsets(OffsetsInitializer.earliest())
.setBounded(OffsetsInitializer.offsets(endOffset))
.setDeserializer(new ObjectDeserializer())
.build();
DataStream<Market> mainStream2 = env.fromSource(mainSource, WatermarkStrategy.forMonotonousTimestamps(), "mainSource");
// further stream operations
I would like to call the first stream from which I will fetch the data and set it locally then I can use it in operations on the second stream
You want to use one Stream's data to control another Stream's behavior. The best way is to use the Broadcast state pattern.
This involves creating a BroadcastStream
from mainStream1
, and then connecting mainStream2
to mainStream1
. Now mainStream2
can access the data from mainStream1
.
Here is a high level example based on your code. I am assuming that the key is String.
// Broadcast Stream
MapStateDescriptor<String, Market> stateDescriptor = new MapStateDescriptor<>(
"RulesBroadcastState",
BasicTypeInfo.STRING_TYPE_INFO,
TypeInformation.of(new TypeHint<Market>() {}));
// broadcast the rules and create the broadcast state
BroadcastStream<Market> mainStream1BroadcastStream = mainStream1.keyBy(// key by Id).
.broadcast(stateDescriptor);
DataStream<Market> yourOutput = mainStream2
.connect(mainStream1BroadcastStream)
.process(
new KeyedBroadcastProcessFunction<>() {
// You can access mainStream1 output and mainStream2 data here.
}
);
This concept is explained in detail here. The code is also a modified version shown here - https://nightlies.apache.org/flink/flink-docs-master/docs/dev/datastream/fault-tolerance/broadcast_state/#the-broadcast-state-pattern