I have a pipeline that receive some data from pub sub, do some processing, and needs to process all data on Bigtable based on the result of that processing.
For example, I have a pub sub message like: {clientId: 10}
, so I need to read from Bigtable all the data for clientId 10 (I know how to create the scan based on the clientId). The problem is that both reads that we have at the moment for Bigtable (BigtableIO and CloudBigtableIO) are based on the fact that pipeline starts with bigtable, so I can not (or could not find a way) to use them on the middle of the pipeline. How can I achieve this case?
Simple pseudo-like code:
Pipeline p = Pipeline.create(...)
p.apply(PubsubIO.readMessagesWithAttributes ...)
.apply( PubsubMessageToScans()) // I know how to do this
.apply( ReadBigTable()) // How to do this?
To complement @Billy's answer, you can also try using the BigtableDataClient class inside a ParDo transformation. The data input will be the parameters contained in a PubsubMessage to configure the Scan object, then in the ParDo set the Scan parameters, make the connection to BigTable and obtain the filtered results.
This snippet maybe be useful:
@ProcessElement
public void processElement(@Element String element, OutputReceiver<String> out){
String projectId = "<PROJECT_ID>";
String instanceId = "<INSTANCE_ID>";
String tableName = "<TABLENAME>";
String[] scanParameters = element.split(",");
try (Connection connection = BigtableConfiguration.connect(projectId, instanceId)){
Table table = connection.getTable(TableName.valueOf(tableName));
Scan scan = new Scan();
scan.withStartRow(Bytes.toBytes(scanParameters[0]));
scan.withStopRow(Bytes.toBytes(scanParameters[1]));
ResultScanner scanner = table.getScanner(scan);
for (Result row : scanner) {
System.out.println(row);
}
catch (Exception e){
e.printStackTrace();
}
out.output("");
}
I didn't test it directly with a PubsubMessage but, you can do another transform to adapt the message or directly get the PubsubMessage and set the Scan object.