Search code examples
apache-kafkagoogle-cloud-dataflowavroapache-beam

Aggregating Topics with apache beam Kafkaio (Dataflow)


I have slow moving data in a compacted kafka topic and also fast moving data in another topic.

1) fast moving data is real-time ingested unbounded events from Kafka.

2) slow moving data is meta data which is used to enrich the fast moving data. This is a compacted topic and the data is updated infrequently (days/months).

3) Each fast moving data payload should have a meta data payload with the same customerId which they can be aggregated with.

I would like to aggregate the fast/slow moving data against the customerId (common in the data on both topics). I was wondering how you would go about doing this? So far:

PTransform<PBegin, PCollection<KV<byte[], byte[]>>> kafka = KafkaIO.<byte[], byte[]>read()
    .withBootstrapServers(“url:port")
    .withTopics([“fast-moving-data”, “slow-moving-data"])
    .withKeyDeserializer(ByteArrayDeserializer.class)
    .withValueDeserializer(ByteArrayDeserializer.class)
    .updateConsumerProperties((Map) props)
    .withoutMetadata();

I have noticed that I can use .withTopics and specific the different topics I would like to use, but after this point I've not been able to find any examples to help in terms of aggregation. Any help would be appreciated.


Solution

  • The following pattern which is also discussed in this SO Q&A might be a good one to explore for your use case. One item that could be an issue is the size of your compacted slow moving stream. Hope its useful.

    For this pattern we can use the GenerateSequence source transform to emit a value periodically for example once a day. Pass this value into a global window via a data-driven trigger that activates on each element. In a DoFn, use this process as a trigger to pull data from your bounded source Create your SideInput for use in downstream transforms.

    It's important to note that because this pattern uses a global-window SideInput triggering on processing time, matching to elements being processed in event time will be nondeterministic. For example if we have a main pipeline which is Windowed on Event time, the version of the SideInput View that those windows will see will depend on the latest trigger that has fired in processing time rather than the event time.

    Also important to note that in general the SideInput should be something that fits into memory.

    Java (SDK 2.9.0):

    In the sample below the sideinput is updated at very short intervals, this is so that effects can be easily seen. The expectation is that the side input is updating slowly, for example every few hours or once a day.

    In the example code below we make use of a Map that we create in a DoFn which becomes the View.asSingleton, this is the recommended approach for this pattern.

    The sample below illustrates the pattern, please note the View.asSingleton is rebuilt on every counter update.

    For your use case, you could replace the GenerateSequence transforms with PubSubIO transforms. Does that make sense?

    public static void main(String[] args) {
    
     // Create pipeline
     PipelineOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()
         .as(PipelineOptions.class);
    
     // Using View.asSingleton, this pipeline uses a dummy external service as illustration.
     // Run in debug mode to see the output
     Pipeline p = Pipeline.create(options);
    
     // Create slowly updating sideinput
    
     PCollectionView<Map<String, String>> map = p
         .apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(5L)))
    
         .apply(Window.<Long>into(new GlobalWindows())
             .triggering(Repeatedly.forever(AfterProcessingTime.pastFirstElementInPane()))
             .discardingFiredPanes())
    
         .apply(ParDo.of(new DoFn<Long, Map<String, String>>() {
           @ProcessElement public void process(@Element Long input,
               OutputReceiver<Map<String, String>> o) {
             // Do any external reads needed here...
             // We will make use of our dummy external service.
             // Every time this triggers, the complete map will be replaced with that read from 
             // the service.
             o.output(DummyExternalService.readDummyData());
           }
    
         })).apply(View.asSingleton());
    
     // ---- Consume slowly updating sideinput
    
     // GenerateSequence is only used here to generate dummy data for this illustration.
     // You would use your real source for example PubSubIO, KafkaIO etc...
     p.apply(GenerateSequence.from(0).withRate(1, Duration.standardSeconds(1L)))
         .apply(Window.into(FixedWindows.of(Duration.standardSeconds(1))))
         .apply(Sum.longsGlobally().withoutDefaults())
         .apply(ParDo.of(new DoFn<Long, KV<Long, Long>>() {
    
           @ProcessElement public void process(ProcessContext c) {
             Map<String, String> keyMap = c.sideInput(map);
             c.outputWithTimestamp(KV.of(1L, c.element()), Instant.now());
    
      LOG.debug("Value is {} key A is {} and key B is {}"
    , c.element(), keyMap.get("Key_A"),keyMap.get("Key_B"));
    
           }
         }).withSideInputs(map));
    
     p.run();
    }
    
    public static class DummyExternalService {
    
     public static Map<String, String> readDummyData() {
    
       Map<String, String> map = new HashMap<>();
       Instant now = Instant.now();
    
       DateTimeFormatter dtf = DateTimeFormat.forPattern("HH:MM:SS");
    
       map.put("Key_A", now.minus(Duration.standardSeconds(30)).toString(dtf));
       map.put("Key_B", now.minus(Duration.standardSeconds(30)).toString());
    
       return map;
    
     }
    }