I am trying to implement apache beam for a streaming process where I want to calculate the min(), max() value of an item with every registered timestamp.
Eg:
Timestamp | item_count |
---|---|
2021-08-03 01:00:03.22333 UTC | 5 |
2021-08-03 01:00:03.256427 UTC | 4 |
2021-08-03 01:00:03.256497 UTC | 7 |
2021-08-03 01:00:03.256499 UTC | 2 |
Output :
Timestamp | Min | Max |
---|---|---|
2021-08-03 01:00:03.22333 UTC | 5 | 5 |
2021-08-03 01:00:03.256427 UTC | 4 | 5 |
2021-08-03 01:00:03.256497 UTC | 4 | 7 |
2021-08-03 01:00:03.256499 UTC | 2 | 7 |
I am not able to figure out how do I fit my use-case to windowing, since for me the frame starts from row 1 and ends with every new I am reading. Any suggestions how should I approach this?
Thank you
This is not going to be 100% perfect, since there's always going to be some latency and you may get elements in wrong order, but should be good enough.
public interface RollingMinMaxOptions extends PipelineOptions {
@Description("Topic to read from")
@Default.String("projects/pubsub-public-data/topics/taxirides-realtime")
String getTopic();
void setTopic(String value);
}
public static class MinMax extends Combine.CombineFn<Float, KV<Float, Float>, KV<Float, Float>> { //Types: Input, Accum, Output
@Override
public KV<Float, Float> createAccumulator() {
KV<Float, Float> start = KV.of(Float.POSITIVE_INFINITY, 0f);
return start;
}
@Override
public KV<Float, Float> addInput(KV<Float, Float> accumulator, Float input) {
Float max = Math.max(accumulator.getValue(), input);
Float min = Math.min(accumulator.getKey(), input);
return KV.of(min, max);
}
@Override
public KV<Float, Float> mergeAccumulators(Iterable<KV<Float, Float>> accumulators) {
Float max = 0f;
Float min = Float.POSITIVE_INFINITY;
for (KV<Float, Float> kv : accumulators) {
max = Math.max(kv.getValue(), max);
min = Math.min(kv.getKey(), min);
}
return KV.of(min, max);
}
@Override
public KV<Float, Float> extractOutput(KV<Float, Float> accumulator) {
return accumulator;
}
}
public static void main(String[] args) {
RollingMinMaxOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(RollingMinMaxOptions.class);
Pipeline p = Pipeline.create(options);
p
.apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getTopic()))
.apply("Get meter reading", ParDo.of(new DoFn<String, Float>() {
@ProcessElement
public void processElement(ProcessContext c) throws ParseException {
JSONObject json = new JSONObject(c.element());
String rideStatus = json.getString("ride_status");
Float meterReading = json.getFloat("meter_reading");
if (rideStatus.equals("dropoff") && meterReading > 0){
c.output(meterReading);
}
}
})
)
.apply(Window.<Float>into(
new GlobalWindows())
.triggering(Repeatedly.forever(
AfterPane.elementCountAtLeast(1)
)
)
.withTimestampCombiner(TimestampCombiner.LATEST)
.accumulatingFiredPanes()
)
.apply(Combine.globally(new MinMax()))
.apply("Format", ParDo.of(new DoFn<KV<Float, Float>, TableRow>() {
@ProcessElement
public void processElement(ProcessContext c) throws ParseException {
TableRow row = new TableRow();
row.set("min", c.element().getKey());
row.set("max", c.element().getValue());
row.set("timestamp", c.timestamp().toString());
LOG.info(row.toString());
c.output(row);
}
})
);
p.run();
}
If you want the min / max to reset every X time, change it to a FixedWindow of that size