Search code examples
google-cloud-dataflowapache-beamapache-beam-io

Apache Beam / Google Cloud Dataflow big-query reader failing from second run


We have a Dataflow build using Apache Beam and deployed in GCP Dataflow infrastructure. Dataflow instance run perfectly first time, and create partition table as expected, But when it run second time onwards it wipe out the result from the dataset, rather than replacing with new dataset in that specific partition. Job work perfectly when it run using Direct runner from my local setup.

Code Example:

        pipeline.apply(
            "Read from BigQuery (table_name) Table: ",
            BigQueryIO.readTableRows()
                .fromQuery(
                    String.format(
                        "SELECT  %s FROM `%s.%s.%s`",
                        FIELDS.stream().collect(Collectors.joining(",")), project, dataset, table))
                .usingStandardSql()
                .withoutValidation()));
    PCollection<VideoPlacement.Placement> rows =
        tableRow.apply(
            "TableRows to BigQueryVideoPlacement.Placement",
            MapElements.into(TypeDescriptor.of(Model.class))
                .via(Model::fromTableRow));

Please let me know if know what I am missing here. Thanks in advance!


Solution

  • Figured out!

    Here is the change I did to work on templated env:

                "Read from BigQuery (table_name) Table: ",
                BigQueryIO.readTableRows()
                    .fromQuery(
                        String.format(
                            "SELECT  %s FROM `%s.%s.%s`",
                            FIELDS.stream().collect(Collectors.joining(",")), project, dataset, table))
                    .usingStandardSql()
                    .withoutValidation()
                    .withTemplateCompatibility()));
        PCollection<VideoPlacement.Placement> rows =
            tableRow.apply(
                "TableRows to BigQueryVideoPlacement.Placement",
                MapElements.into(TypeDescriptor.of(Model.class))
                    .via(Model::fromTableRow));
    
    .withTemplateCompatibility()
    

    Please see more documentation here