I'm trying to combine elements within a PCollection<KV<Long,Double>>
public class StreamPipelineBuilder {
public void execute() {
final List<UserTxn> txn = Utils.getUserTxnList().subList(0, 10);
// create Pipeline
final Pipeline pipeline = Pipeline.create();
TestStream.Builder<KV<Long, UserTxn>>
streamBuilder = TestStream.create(UserTxnKVCoder.of());
// add all lines with timestamps to the TestStream
final List<TimestampedValue<KV<Long, UserTxn>>> timestamped =
txn.stream().map(i -> {
final KV<Long, UserTxn> kv = KV.of(i.getId(), i);
final LocalDateTime time = i.getTime();
final long millis = time.toInstant(ZoneOffset.UTC).toEpochMilli();
final Instant instant = new Instant(millis);
return TimestampedValue.of(kv, instant);
}).collect(Collectors.toList());
for (TimestampedValue<KV<Long, UserTxn>> value : timestamped) {
streamBuilder = streamBuilder.addElements(value);
}
// create the unbounded PCollection from TestStream
PCollection<KV<Long, UserTxn>> input = pipeline.apply(streamBuilder.advanceWatermarkToInfinity());
PCollection<KV<Long, UserTxn>> windowed =
input.apply(Window.<KV<Long, UserTxn>>into(FixedWindows.of(Duration.standardSeconds(5)))
.discardingFiredPanes()
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(2)))
.withAllowedLateness(Duration.ZERO));
PCollection<KV<Long, Double>> added = windowed.apply("aggregate", new PTransform<>() {
@Override
public PCollection<KV<Long, Double>> expand(PCollection<KV<Long, UserTxn>> input) {
return input.apply(
MapElements.into(
TypeDescriptors.kvs(TypeDescriptors.longs(), TypeDescriptors.doubles())
).via((record) -> KV.of(record.getKey(), record.getValue().getAmount()))
).apply(Combine.globally((SerializableFunction<Iterable<KV<Long, Double>>, KV<Long, Double>>) input1 -> {
AtomicLong keys = new AtomicLong();
AtomicDouble amounts = new AtomicDouble();
input1.forEach(e -> {
keys.addAndGet(e.getKey());
amounts.addAndGet(e.getValue());
});
return KV.of(keys.get(), amounts.get());
}).withoutDefaults());
}
});
added.apply(PrintPCollection.with());
pipeline.run().waitUntilFinish();
}
}
each Key is different and I want to add them up like sum(key),sum(value) but when I run my code it does not work I get this
[INFO] 2022-11-03 00:12:21.010 PrintPCollection - KV{1, 821.21}
[INFO] 2022-11-03 00:12:21.014 PrintPCollection - KV{6, 973.31}
[INFO] 2022-11-03 00:12:21.014 PrintPCollection - KV{8, 980.26}
[INFO] 2022-11-03 00:12:21.014 PrintPCollection - KV{4, 37.53}
[INFO] 2022-11-03 00:12:21.014 PrintPCollection - KV{2, 541.95}
[INFO] 2022-11-03 00:12:21.014 PrintPCollection - KV{7, 705.49}
[INFO] 2022-11-03 00:12:21.014 PrintPCollection - KV{3, 384.09}
[INFO] 2022-11-03 00:12:21.015 PrintPCollection - KV{9, 106.96}
[INFO] 2022-11-03 00:12:21.015 PrintPCollection - KV{5, 207.3}
[INFO] 2022-11-03 00:12:21.015 PrintPCollection - KV{10, 675.48}
What I was expecting to get were 5 records since the window fires after every 2 elements and the collection starts with 10 but it is not working, what I'm doing wrong?
Thank you!
The problem was in how the window was defined, below the code with the output after making the window Global the pane count worked as expected
package dev.donhk.stream;
import com.google.common.util.concurrent.AtomicDouble;
import dev.donhk.pojos.UserTxn;
import dev.donhk.transform.PrintPCollection;
import dev.donhk.utilities.Utils;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.testing.TestStream;
import org.apache.beam.sdk.transforms.*;
import org.apache.beam.sdk.transforms.windowing.*;
import org.apache.beam.sdk.values.*;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.joda.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
public class StreamPipelineBuilder {
private static final Logger LOG = LogManager.getLogger(StreamPipelineBuilder.class);
public void execute() {
final List<UserTxn> txn = Utils.getUserTxnList().subList(0, 10);
// create Pipeline
final Pipeline pipeline = Pipeline.create();
TestStream.Builder<KV<Long, UserTxn>>
streamBuilder = TestStream.create(UserTxnKVCoder.of());
// add all lines with timestamps to the TestStream
final List<TimestampedValue<KV<Long, UserTxn>>> timestamped =
txn.stream().map(i -> {
final KV<Long, UserTxn> kv = KV.of(i.getId(), i);
final LocalDateTime time = i.getTime();
final long millis = time.toInstant(ZoneOffset.UTC).toEpochMilli();
final Instant instant = new Instant(millis);
return TimestampedValue.of(kv, instant);
}).collect(Collectors.toList());
for (TimestampedValue<KV<Long, UserTxn>> value : timestamped) {
streamBuilder = streamBuilder.addElements(value);
}
// create the unbounded PCollection from TestStream
PCollection<KV<Long, UserTxn>> input = pipeline.apply(streamBuilder.advanceWatermarkToInfinity());
PCollection<KV<Long, UserTxn>> windowed =
input.apply(Window.<KV<Long, UserTxn>>into(new GlobalWindows())
.triggering(Repeatedly.forever(AfterPane.elementCountAtLeast(5)))
.discardingFiredPanes()
.withOnTimeBehavior(Window.OnTimeBehavior.FIRE_IF_NON_EMPTY));
PCollection<KV<Long, Double>> added = windowed.apply("aggregate", new PTransform<>() {
@Override
public PCollection<KV<Long, Double>> expand(PCollection<KV<Long, UserTxn>> input) {
return input.apply(
MapElements.into(
TypeDescriptors.kvs(TypeDescriptors.longs(), TypeDescriptors.doubles())
).via((record) -> KV.of(record.getKey(), record.getValue().getAmount()))
).apply(Combine.globally((SerializableFunction<Iterable<KV<Long, Double>>, KV<Long, Double>>) input1 -> {
AtomicLong myLong = new AtomicLong();
AtomicDouble myDouble = new AtomicDouble();
StreamSupport
.stream(input1.spliterator(), false)
.forEach(e -> {
LOG.info(e.getKey());
myLong.addAndGet(e.getKey());
myDouble.addAndGet(e.getValue());
});
LOG.info("new window");
return KV.of(myLong.get(), myDouble.get());
}));
}
});
added.apply(PrintPCollection.with());
pipeline.run().waitUntilFinish();
}
}
this produces
[INFO] 2022-11-04 23:18:32.816 StreamPipelineBuilder - new window
[INFO] 2022-11-04 23:18:33.875 StreamPipelineBuilder - 1
[INFO] 2022-11-04 23:18:33.887 StreamPipelineBuilder - 2
[INFO] 2022-11-04 23:18:33.887 StreamPipelineBuilder - 3
[INFO] 2022-11-04 23:18:33.887 StreamPipelineBuilder - 4
[INFO] 2022-11-04 23:18:33.887 StreamPipelineBuilder - 5
[INFO] 2022-11-04 23:18:33.887 StreamPipelineBuilder - new window
[INFO] 2022-11-04 23:18:33.911 PrintPCollection - KV{15, 1992.08}
[INFO] 2022-11-04 23:18:33.951 StreamPipelineBuilder - 6
[INFO] 2022-11-04 23:18:33.952 StreamPipelineBuilder - 7
[INFO] 2022-11-04 23:18:33.952 StreamPipelineBuilder - 8
[INFO] 2022-11-04 23:18:33.952 StreamPipelineBuilder - 9
[INFO] 2022-11-04 23:18:33.952 StreamPipelineBuilder - 10
[INFO] 2022-11-04 23:18:33.952 StreamPipelineBuilder - new window
[INFO] 2022-11-04 23:18:33.956 PrintPCollection - KV{40, 3441.5}