I'm looking for some examples of usage of Triggers and Timers in Apache beam, I wanted to use Processing-time timers for listening my data from pub sub in every 5 minutes and using Processing time triggers processing the above data collected in an hour altogether in python.
Please take a look at the following resources: Stateful processing with Apache Beam and Timely (and Stateful) Processing with Apache Beam
The first blog post is more general in how to handle states for context, and the second has some examples on buffering and triggering after a certain period of time, which seems similar to what you are trying to do.
A full example was requested. Here is what I was able to come up with:
PCollection<String> records =
pipeline.apply(
"ReadPubsub",
PubsubIO.readStrings()
.fromSubscription(
"projects/{project}/subscriptions/{subscription}"));
TupleTag<Iterable<String>> every5MinTag = new TupleTag<>();
TupleTag<Iterable<String>> everyHourTag = new TupleTag<>();
PCollectionTuple timersTuple =
records
.apply("WithKeys", WithKeys.of(1)) // A KV<> is required to use state. Keying by data is more appropriate than hardcode.
.apply(
"Batch",
ParDo.of(
new DoFn<KV<Integer, String>, Iterable<String>>() {
@StateId("buffer5Min")
private final StateSpec<BagState<String>> bufferedEvents5Min =
StateSpecs.bag();
@StateId("count5Min")
private final StateSpec<ValueState<Integer>> countState5Min =
StateSpecs.value();
@TimerId("every5Min")
private final TimerSpec every5MinSpec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@StateId("bufferHour")
private final StateSpec<BagState<String>> bufferedEventsHour =
StateSpecs.bag();
@StateId("countHour")
private final StateSpec<ValueState<Integer>> countStateHour =
StateSpecs.value();
@TimerId("everyHour")
private final TimerSpec everyHourSpec =
TimerSpecs.timer(TimeDomain.PROCESSING_TIME);
@ProcessElement
public void process(
@Element KV<Integer, String> record,
@StateId("count5Min") ValueState<Integer> count5MinState,
@StateId("countHour") ValueState<Integer> countHourState,
@StateId("buffer5Min") BagState<String> buffer5Min,
@StateId("bufferHour") BagState<String> bufferHour,
@TimerId("every5Min") Timer every5MinTimer,
@TimerId("everyHour") Timer everyHourTimer) {
if (Objects.firstNonNull(count5MinState.read(), 0) == 0) {
every5MinTimer
.offset(Duration.standardMinutes(1))
.align(Duration.standardMinutes(1))
.setRelative();
}
buffer5Min.add(record.getValue());
if (Objects.firstNonNull(countHourState.read(), 0) == 0) {
everyHourTimer
.offset(Duration.standardMinutes(60))
.align(Duration.standardMinutes(60))
.setRelative();
}
bufferHour.add(record.getValue());
}
@OnTimer("every5Min")
public void onTimerEvery5Min(
OnTimerContext context,
@StateId("buffer5Min") BagState<String> bufferState,
@StateId("count5Min") ValueState<Integer> countState) {
if (!bufferState.isEmpty().read()) {
context.output(every5MinTag, bufferState.read());
bufferState.clear();
countState.clear();
}
}
@OnTimer("everyHour")
public void onTimerEveryHour(
OnTimerContext context,
@StateId("bufferHour") BagState<String> bufferState,
@StateId("countHour") ValueState<Integer> countState) {
if (!bufferState.isEmpty().read()) {
context.output(everyHourTag, bufferState.read());
bufferState.clear();
countState.clear();
}
}
})
.withOutputTags(every5MinTag, TupleTagList.of(everyHourTag)));
timersTuple
.get(every5MinTag)
.setCoder(IterableCoder.of(StringUtf8Coder.of()))
.apply(<<do something every 5 min>>);
timersTuple
.get(everyHourTag)
.setCoder(IterableCoder.of(StringUtf8Coder.of()))
.apply(<< do something every hour>>);
pipeline.run().waitUntilFinish();