Search code examples
google-cloud-platformgoogle-cloud-datastoreperformance-testingscalabilitythroughput

Google Dataflow template job not scaling when writing records to Google datastore


I have a small dataflow job triggered from a cloud function using a dataflow template. The job basically reads from a table in Bigquery, converts the resultant Tablerow to a Key-Value, and writes the Key-Value to Datastore.

This is what my code looks like :-

PCollection<TableRow> bigqueryResult = p.apply("BigQueryRead",
                BigQueryIO.readTableRows().withTemplateCompatibility()
                        .fromQuery(options.getQuery()).usingStandardSql()
                        .withoutValidation());

bigqueryResult.apply("WriteFromBigqueryToDatastore", ParDo.of(new DoFn<TableRow, String>() {                
            @ProcessElement
            public void processElement(ProcessContext pc) {
                TableRow row = pc.element();

                Datastore datastore = DatastoreOptions.getDefaultInstance().getService();
                KeyFactory keyFactoryCounts = datastore.newKeyFactory().setNamespace("MyNamespace")
                        .setKind("MyKind");

                Key key = keyFactoryCounts.newKey("Key");
                Builder builder =   Entity.newBuilder(key);
                builder.set("Key", BooleanValue.newBuilder("Value").setExcludeFromIndexes(true).build());   

                Entity entity= builder.build();
                datastore.put(entity);
            }
        }));

This pipeline runs fine when the number of records I try to process is anywhere in the range of 1 to 100. However, when I try putting more load on the pipeline, ie, ~10000 records, the pipeline does not scale (eventhough autoscaling is set to THROUGHPUT based and maximumWorkers is specified to as high as 50 with an n1-standard-1 machine type). The job keeps processing 3 or 4 elements per second with one or two workers. This is impacting the performance of my system.

Any advice on how to scale up the performance is very welcome. Thanks in advance.


Solution

  • Found a solution by using DatastoreIO instead of the datastore client. Following is the snippet I used,

        PCollection<TableRow> row = p.apply("BigQueryRead",
                    BigQueryIO.readTableRows().withTemplateCompatibility()
                            .fromQuery(options.getQueryForSegmentedUsers()).usingStandardSql()
                            .withoutValidation());          
    
        PCollection<com.google.datastore.v1.Entity> userEntity = row.apply("ConvertTablerowToEntity", ParDo.of(new DoFn<TableRow, com.google.datastore.v1.Entity>() {
    
            @SuppressWarnings("deprecation")
            @ProcessElement
            public void processElement(ProcessContext pc) {
                final String namespace = "MyNamespace";
                final String kind = "MyKind";
    
                com.google.datastore.v1.Key.Builder keyBuilder = DatastoreHelper.makeKey(kind, "root");
                if (namespace != null) {
                  keyBuilder.getPartitionIdBuilder().setNamespaceId(namespace);
                }
                final com.google.datastore.v1.Key ancestorKey = keyBuilder.build();
    
                TableRow row = pc.element();
                String entityProperty = "sample";
    
                String key = "key";
    
                com.google.datastore.v1.Entity.Builder entityBuilder = com.google.datastore.v1.Entity.newBuilder();
                com.google.datastore.v1.Key.Builder keyBuilder1 = DatastoreHelper.makeKey(ancestorKey, kind, key);
                if (namespace != null) {
                    keyBuilder1.getPartitionIdBuilder().setNamespaceId(namespace);
                  }
    
                  entityBuilder.setKey(keyBuilder1.build());
                  entityBuilder.getMutableProperties().put(entityProperty, DatastoreHelper.makeValue("sampleValue").build());
                  pc.output(entityBuilder.build());             
            }
    
        }));
    
        userEntity.apply("WriteToDatastore", DatastoreIO.v1().write().withProjectId(options.getProject()));
    

    This solution was able to scale from 3 elements per second with 1 worker to ~1500 elements per second with 20 workers.