Search code examples
streamingpipelineapache-beam

How to aggregate elements within PCollection<KV<Long, Double>> in apache beam


I'm trying to combine elements within a PCollection<KV<Long,Double>>

public class StreamPipelineBuilder {
    public void execute() {
        final List<UserTxn> txn = Utils.getUserTxnList().subList(0, 10);
        // create Pipeline
        final Pipeline pipeline = Pipeline.create();
        TestStream.Builder<KV<Long, UserTxn>>
                streamBuilder = TestStream.create(UserTxnKVCoder.of());
        // add all lines with timestamps to the TestStream
        final List<TimestampedValue<KV<Long, UserTxn>>> timestamped =
                txn.stream().map(i -> {
                    final KV<Long, UserTxn> kv = KV.of(i.getId(), i);
                    final LocalDateTime time = i.getTime();
                    final long millis = time.toInstant(ZoneOffset.UTC).toEpochMilli();
                    final Instant instant = new Instant(millis);
                    return TimestampedValue.of(kv, instant);
                }).collect(Collectors.toList());

        for (TimestampedValue<KV<Long, UserTxn>> value : timestamped) {
            streamBuilder = streamBuilder.addElements(value);
        }

        // create the unbounded PCollection from TestStream
        PCollection<KV<Long, UserTxn>> input = pipeline.apply(streamBuilder.advanceWatermarkToInfinity());
        PCollection<KV<Long, UserTxn>> windowed =
                input.apply(Window.<KV<Long, UserTxn>>into(FixedWindows.of(Duration.standardSeconds(5)))
                        .discardingFiredPanes()
                        .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(2)))
                        .withAllowedLateness(Duration.ZERO));

        PCollection<KV<Long, Double>> added = windowed.apply("aggregate", new PTransform<>() {
            @Override
            public PCollection<KV<Long, Double>> expand(PCollection<KV<Long, UserTxn>> input) {
                return input.apply(
                        MapElements.into(
                                TypeDescriptors.kvs(TypeDescriptors.longs(), TypeDescriptors.doubles())
                        ).via((record) -> KV.of(record.getKey(), record.getValue().getAmount()))
                ).apply(Combine.globally((SerializableFunction<Iterable<KV<Long, Double>>, KV<Long, Double>>) input1 -> {
                    AtomicLong keys = new AtomicLong();
                    AtomicDouble amounts = new AtomicDouble();
                    input1.forEach(e -> {
                        keys.addAndGet(e.getKey());
                        amounts.addAndGet(e.getValue());
                    });
                    return KV.of(keys.get(), amounts.get());
                }).withoutDefaults());

            }
        });

        added.apply(PrintPCollection.with());

        pipeline.run().waitUntilFinish();
    }
}

each Key is different and I want to add them up like sum(key),sum(value) but when I run my code it does not work I get this

[INFO] 2022-11-03 00:12:21.010 PrintPCollection - KV{1, 821.21}
[INFO] 2022-11-03 00:12:21.014 PrintPCollection - KV{6, 973.31}
[INFO] 2022-11-03 00:12:21.014 PrintPCollection - KV{8, 980.26}
[INFO] 2022-11-03 00:12:21.014 PrintPCollection - KV{4, 37.53}
[INFO] 2022-11-03 00:12:21.014 PrintPCollection - KV{2, 541.95}
[INFO] 2022-11-03 00:12:21.014 PrintPCollection - KV{7, 705.49}
[INFO] 2022-11-03 00:12:21.014 PrintPCollection - KV{3, 384.09}
[INFO] 2022-11-03 00:12:21.015 PrintPCollection - KV{9, 106.96}
[INFO] 2022-11-03 00:12:21.015 PrintPCollection - KV{5, 207.3}
[INFO] 2022-11-03 00:12:21.015 PrintPCollection - KV{10, 675.48}

What I was expecting to get were 5 records since the window fires after every 2 elements and the collection starts with 10 but it is not working, what I'm doing wrong?

Thank you!


Solution

  • The problem was in how the window was defined, below the code with the output after making the window Global the pane count worked as expected

    package dev.donhk.stream;
    
    import com.google.common.util.concurrent.AtomicDouble;
    import dev.donhk.pojos.UserTxn;
    import dev.donhk.transform.PrintPCollection;
    import dev.donhk.utilities.Utils;
    import org.apache.beam.sdk.Pipeline;
    import org.apache.beam.sdk.testing.TestStream;
    import org.apache.beam.sdk.transforms.*;
    import org.apache.beam.sdk.transforms.windowing.*;
    import org.apache.beam.sdk.values.*;
    import org.apache.logging.log4j.LogManager;
    import org.apache.logging.log4j.Logger;
    import org.joda.time.Instant;
    
    import java.time.LocalDateTime;
    import java.time.ZoneOffset;
    import java.util.List;
    import java.util.concurrent.atomic.AtomicLong;
    import java.util.stream.Collectors;
    import java.util.stream.StreamSupport;
    
    public class StreamPipelineBuilder {
        private static final Logger LOG = LogManager.getLogger(StreamPipelineBuilder.class);
    
        public void execute() {
            final List<UserTxn> txn = Utils.getUserTxnList().subList(0, 10);
            // create Pipeline
            final Pipeline pipeline = Pipeline.create();
            TestStream.Builder<KV<Long, UserTxn>>
                    streamBuilder = TestStream.create(UserTxnKVCoder.of());
            // add all lines with timestamps to the TestStream
            final List<TimestampedValue<KV<Long, UserTxn>>> timestamped =
                    txn.stream().map(i -> {
                        final KV<Long, UserTxn> kv = KV.of(i.getId(), i);
                        final LocalDateTime time = i.getTime();
                        final long millis = time.toInstant(ZoneOffset.UTC).toEpochMilli();
                        final Instant instant = new Instant(millis);
                        return TimestampedValue.of(kv, instant);
                    }).collect(Collectors.toList());
    
            for (TimestampedValue<KV<Long, UserTxn>> value : timestamped) {
                streamBuilder = streamBuilder.addElements(value);
            }
    
            // create the unbounded PCollection from TestStream
            PCollection<KV<Long, UserTxn>> input = pipeline.apply(streamBuilder.advanceWatermarkToInfinity());
            PCollection<KV<Long, UserTxn>> windowed =
                    input.apply(Window.<KV<Long, UserTxn>>into(new GlobalWindows())
                            .triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(5)))
                            .discardingFiredPanes()
                            .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY));
    
            PCollection<KV<Long, Double>> added = windowed.apply("aggregate", new PTransform<>() {
                @Override
                public PCollection<KV<Long, Double>> expand(PCollection<KV<Long, UserTxn>> input) {
                    return input.apply(
                            MapElements.into(
                                    TypeDescriptors.kvs(TypeDescriptors.longs(), TypeDescriptors.doubles())
                            ).via((record) -> KV.of(record.getKey(), record.getValue().getAmount()))
                    ).apply(Combine.globally((SerializableFunction<Iterable<KV<Long, Double>>, KV<Long, Double>>) input1 -> {
                        AtomicLong myLong = new AtomicLong();
                        AtomicDouble myDouble = new AtomicDouble();
                        StreamSupport
                                .stream(input1.spliterator(), false)
                                .forEach(e -> {
                                    LOG.info(e.getKey());
                                    myLong.addAndGet(e.getKey());
                                    myDouble.addAndGet(e.getValue());
                                });
                        LOG.info("new window");
                        return KV.of(myLong.get(), myDouble.get());
                    }));
                }
            });
    
            added.apply(PrintPCollection.with());
    
            pipeline.run().waitUntilFinish();
        }
    }
    

    this produces

    [INFO] 2022-11-04 23:18:32.816 StreamPipelineBuilder - new window
    [INFO] 2022-11-04 23:18:33.875 StreamPipelineBuilder - 1
    [INFO] 2022-11-04 23:18:33.887 StreamPipelineBuilder - 2
    [INFO] 2022-11-04 23:18:33.887 StreamPipelineBuilder - 3
    [INFO] 2022-11-04 23:18:33.887 StreamPipelineBuilder - 4
    [INFO] 2022-11-04 23:18:33.887 StreamPipelineBuilder - 5
    [INFO] 2022-11-04 23:18:33.887 StreamPipelineBuilder - new window
    [INFO] 2022-11-04 23:18:33.911 PrintPCollection - KV{15, 1992.08}
    [INFO] 2022-11-04 23:18:33.951 StreamPipelineBuilder - 6
    [INFO] 2022-11-04 23:18:33.952 StreamPipelineBuilder - 7
    [INFO] 2022-11-04 23:18:33.952 StreamPipelineBuilder - 8
    [INFO] 2022-11-04 23:18:33.952 StreamPipelineBuilder - 9
    [INFO] 2022-11-04 23:18:33.952 StreamPipelineBuilder - 10
    [INFO] 2022-11-04 23:18:33.952 StreamPipelineBuilder - new window
    [INFO] 2022-11-04 23:18:33.956 PrintPCollection - KV{40, 3441.5}