Search code examples
mongodbdataflowapache-beam-ioapache-beam

Apache Beam : Refreshing a sideinput which i am reading from the MongoDB using MongoDbIO.read() Part 2


Not sure about how this GenerateSequence work for me as i have to read values from Mongo periodically on hourly or on daily basis, created a ParDo that reads the MongoDB, also added window into GlobalWindows with an trigger (trigger i will update as pr requirement). But below code snippet giving return type error so could you please help me to correct below lines of code? Also find snapshot of the error. Also how this Generate Sequence help in my case ?

enter image description here

PCollectionView<List<String>> list_of_vins = pipeline
                  .apply(GenerateSequence.from(0).withRate(1, Duration.standardMinutes(5))) // adjust polling rate
                  .apply(ParDo.of(new DoFn<Long, List<String>>() {
                    @ProcessElement
                    public void process(ProcessContext c) {
                      // Read entire DB, and output as a List<String>
                        final String uriString = "mongodb://$[username]:$[password]@$[hostlist]/$[database]?authSource=$[authSource]";
                        MongoClient mongoClient = MongoClients.create(uriString);
                        MongoDatabase mongoDB = mongoClient.getDatabase(options.getMongoDBHostName());
                        MongoCollection<Document> mongoCollection = mongoDB.getCollection(options.getMongoVinCollectionName());
                        c.output((List<String>) ((View) mongoCollection).asList());
                    }
                  })
                  .apply(Window.into(new GlobalWindows()).triggering(AfterPane.elementCountAtLeast(1))));

Solution

  • You'll need to specify the types on the Window transform like this:

    .apply(Window.<List<String>>into(...));