I am reading from a cassandra table using apache beam CassandraIO. I would like to know if there is a way that can help me to put some condition/filter before reading from cassandra. As the table volume will increase in future, it will take significant amount of time to read the whole table.
I have following PCollection which holds the count of a pCollection and I want this to use as a pre-condition to read from cassandra if count>0
PCollection<Long> countRecords = dataPCollection.apply(
"Count", Count.globally());
I am reading from cassandra like this. As per CassandraIO.Read it should be always at the root of the pipeline. Is there any workaround for this?
PCollection<CassandraEntity> cassandraEntityPCollection = pipeline
.apply("Fetching from Cassandra",
CassandraIO.<CassandraEntity>read()
.withCassandraConfig(cassandraConfigSpec)
.withTable("data")
.withEntity(CassandraEntity.class)
.withCoder(SerializableCoder.of(CassandraEntity.class)));
You may want to try to do something like this using conditional creating of CassandraIO.Read
as an input for CassandraIO.readAll()
:
PCollection<Long> countRecords = dataPCollection.apply(
"Count", Count.globally());
PCollection<Scientist> output =
countRecords.apply(ParDo.of(new DoFn<Long, CassandraIO.Read<CassandraEntity>>() {
@ProcessElement
public void processElement(ProcessContext context) {
long numRecords = context.element();
if (numRecords > 0) {
context.output(CassandraIO.<CassandraEntity>read()
.withCassandraConfig(cassandraConfigSpec)
.withTable("data")
.withEntity(CassandraEntity.class)
.withCoder(SerializableCoder.of(CassandraEntity.class)));
}
}
}))
.apply(
CassandraIO.<CassandraEntity>readAll().withCoder(SerializableCoder.of(CassandraEntity.class)));