Search code examples
real-timeapache-beam

Apache Beam - Aggregate date from beginning to logged timestamps


I am trying to implement apache beam for a streaming process where I want to calculate the min(), max() value of an item with every registered timestamp.

Eg:

Timestamp item_count
2021-08-03 01:00:03.22333 UTC 5
2021-08-03 01:00:03.256427 UTC 4
2021-08-03 01:00:03.256497 UTC 7
2021-08-03 01:00:03.256499 UTC 2

Output :

Timestamp Min Max
2021-08-03 01:00:03.22333 UTC 5 5
2021-08-03 01:00:03.256427 UTC 4 5
2021-08-03 01:00:03.256497 UTC 4 7
2021-08-03 01:00:03.256499 UTC 2 7

I am not able to figure out how do I fit my use-case to windowing, since for me the frame starts from row 1 and ends with every new I am reading. Any suggestions how should I approach this?

Thank you


Solution

  • This is not going to be 100% perfect, since there's always going to be some latency and you may get elements in wrong order, but should be good enough.

        public interface RollingMinMaxOptions extends PipelineOptions {
            @Description("Topic to read from")
            @Default.String("projects/pubsub-public-data/topics/taxirides-realtime")
            String getTopic();
    
            void setTopic(String value);
        }
    
        public static class MinMax extends Combine.CombineFn<Float, KV<Float, Float>, KV<Float, Float>> { //Types: Input, Accum, Output
            @Override
            public KV<Float, Float> createAccumulator() {
                KV<Float, Float> start = KV.of(Float.POSITIVE_INFINITY, 0f);
                return start;
            }
    
            @Override
            public KV<Float, Float> addInput(KV<Float, Float> accumulator, Float input) {
                Float max = Math.max(accumulator.getValue(), input);
                Float min = Math.min(accumulator.getKey(), input);
                return KV.of(min, max);
            }
    
            @Override
            public KV<Float, Float> mergeAccumulators(Iterable<KV<Float, Float>> accumulators) {
                Float max = 0f;
                Float min = Float.POSITIVE_INFINITY;
                for (KV<Float, Float> kv : accumulators) {
                    max = Math.max(kv.getValue(), max);
                    min = Math.min(kv.getKey(), min);
                }
                return KV.of(min, max);
            }
    
            @Override
            public KV<Float, Float> extractOutput(KV<Float, Float> accumulator) {
                return accumulator;
    
            }
        }
    
        public static void main(String[] args) {
            RollingMinMaxOptions options = PipelineOptionsFactory.fromArgs(args).withValidation().as(RollingMinMaxOptions.class);
    
            Pipeline p = Pipeline.create(options);
    
            p
                    .apply("ReadFromPubSub", PubsubIO.readStrings().fromTopic(options.getTopic()))
                    .apply("Get meter reading", ParDo.of(new DoFn<String, Float>() {
                                @ProcessElement
                                public void processElement(ProcessContext c) throws ParseException {
                                    JSONObject json = new JSONObject(c.element());
    
                                    String rideStatus = json.getString("ride_status");
                                    Float meterReading = json.getFloat("meter_reading");
    
                                    if (rideStatus.equals("dropoff") && meterReading > 0){
                                        c.output(meterReading);
                                    }
                                }
                            })
                    )
                    .apply(Window.<Float>into(
                            new GlobalWindows())
                            .triggering(Repeatedly.forever(
                                    AfterPane.elementCountAtLeast(1)
                                )
                            )
                            .withTimestampCombiner(TimestampCombiner.LATEST)
                            .accumulatingFiredPanes()
                    )
                    .apply(Combine.globally(new MinMax()))
                    .apply("Format", ParDo.of(new DoFn<KV<Float, Float>, TableRow>() {
                        @ProcessElement
                        public void processElement(ProcessContext c) throws ParseException {
                            TableRow row = new TableRow();
    
                            row.set("min", c.element().getKey());
                            row.set("max", c.element().getValue());
                            row.set("timestamp", c.timestamp().toString());
    
                            LOG.info(row.toString());
                            c.output(row);
                        }
                    })
            );
    
            p.run();
        }
    

    If you want the min / max to reset every X time, change it to a FixedWindow of that size