I have a DoFn
that extends AbstractCloudBigtableTableDoFn<>
in order to send frequent Buffered Mutation requests to Bigtable.
When I run the job in the Cloud, I see repeated log entries at this step of the Dataflow pipeline that look like this:
Opening connection for projectId XXX, instanceId XXX, on data host batch-bigtable.googleapis.com, table admin host bigtableadmin.googleapis.com...
and
Bigtable options: BigtableOptions{XXXXX (lots of option entries here}
The code within the DoFn
looks something like this:
@ProcessElement
public void processElement(ProcessContext c)
{
try
{
BufferedMutator mPutUnit = getConnection().getBufferedMutator(TableName.valueOf(TABLE_NAME));
for (CONDITION)
{
// create lots of different rowsIDs
Put p = new Put(newRowID).addColumn(COL_FAMILY, COL_NAME, COL_VALUE);
mPutUnit.mutate(p);
}
mPutUnit.close();
} catch (IOException e){e.printStackTrace();}
c.output(0);
}
This DoFn
gets called very frequently.
Should I worry that Dataflow tries to re-establish the connection to Bigtable with every call to this DoFn
? I was under the impression that inheriting from this class should ensure that a single connection to Bigtable should be re-used across all calls?
"Opening connection for projectId ..." should appear once per worker per AbstractCloudBigtableTableDoFn instance. Can you double check that connections are being opened per call as opposed to per worker?
Also, can you detail what version of the client you are using and what version of beam?
Thanks!