Search code examples
javacassandraapache-beam

How to read from Cassandra using Apache Beam with some pre-conditions


I am reading from a cassandra table using apache beam CassandraIO. I would like to know if there is a way that can help me to put some condition/filter before reading from cassandra. As the table volume will increase in future, it will take significant amount of time to read the whole table.

I have following PCollection which holds the count of a pCollection and I want this to use as a pre-condition to read from cassandra if count>0

PCollection<Long> countRecords = dataPCollection.apply(
        "Count", Count.globally());

I am reading from cassandra like this. As per CassandraIO.Read it should be always at the root of the pipeline. Is there any workaround for this?

PCollection<CassandraEntity> cassandraEntityPCollection = pipeline
        .apply("Fetching from Cassandra",
            CassandraIO.<CassandraEntity>read()
                .withCassandraConfig(cassandraConfigSpec)
                .withTable("data")
                .withEntity(CassandraEntity.class)
                .withCoder(SerializableCoder.of(CassandraEntity.class)));

Solution

  • You may want to try to do something like this using conditional creating of CassandraIO.Read as an input for CassandraIO.readAll():

    PCollection<Long> countRecords = dataPCollection.apply(
            "Count", Count.globally());
    
    PCollection<Scientist> output =
        countRecords.apply(ParDo.of(new DoFn<Long, CassandraIO.Read<CassandraEntity>>() {
              @ProcessElement
              public void processElement(ProcessContext context) {
                long numRecords = context.element();
                if (numRecords > 0) {
                  context.output(CassandraIO.<CassandraEntity>read()
                      .withCassandraConfig(cassandraConfigSpec)
                      .withTable("data")
                      .withEntity(CassandraEntity.class)
                      .withCoder(SerializableCoder.of(CassandraEntity.class)));
                }
              }
            }))
            .apply(
                CassandraIO.<CassandraEntity>readAll().withCoder(SerializableCoder.of(CassandraEntity.class)));