Search code examples
javagoogle-bigquerygoogle-cloud-dataflowapache-beam

How to replace existing rows in BigQuery Apache Beam Java Dataflow


Currently, I have BigQuery table called cdn_daily_user_playback_requests_1MONTH. This contains large amounts of data based from a daily basis of records. So there would be like data from the whole month of 2023-07, 2023-08, etc. Now, say for example that I want to create new data from 2023-07 and write it into that BigQuery table and that table already has records from the 2023-07 month, how do I do this (replacing the current data in the table to the new one I have) in my Apache Beam code in Java?

My pipeline code is here:

pipeline
            .apply("Read from cdn_requests BigQuery", BigQueryIO
                    .read(new CdnMediaRequestLogEntity.FromSchemaAndRecord())
                    .fromQuery(cdnRequestsQueryString)
                    .usingStandardSql())
            .apply("Validate and Filter Cdn Media Request Log Objects", Filter.by(new CdnMediaRequestValidator()))
            .apply("Convert Cdn Logs To Key Value Pairs", ParDo.of(new CdnMediaRequestResponseSizeKeyValuePairConverter()))
            .apply("Sum the Response Sizes By Key", Sum.longsPerKey())
            .apply("Convert To New Daily Requests Objects", ParDo.of(new CdnDailyRequestConverter(projectId, kind)))
            .apply("Convert Cdn Media Request Entities to Big Query Objects", ParDo.of(new BigQueryCdnDailyRequestRowConverter()))
            .apply("Write Data To BigQuery", BigQueryIO.writeTableRows()
                .to(writeCdnMediaRequestTable)
                .withSchema(cdnDailyRequestSchema)
                .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)
                .withWriteDisposition(BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE));

I did tried and tested the BigQueryIO.Write.WriteDisposition.WRITE_TRUNCATE function but in my understanding, this removes all the data inside that table and then writes the newly created ones in it. But I only want to remove the data only for 2023-07's month and not everything.


Solution

  • Solution: So I found a solution and it worked by creating a SerializableFunction which takes the partition key as an identifier (my table was partitioned on the Date column which has a Datatype of Date) upon writing it in BigQuery. So what happens is that it only takes out parts of the table by Partitioned column.

    This is my sample code for the solution:

    public class BigQueryDayPartitionDestinations implements SerializableFunction<ValueInSingleWindow<TableRow>, TableDestination> {
    
        private final String projectId;
        private final String datasetId;
        private final String pattern;
        private final String table;
    
        public static BigQueryDayPartitionDestinations writePartitionsPerDay(String projectId, String datasetId, String tablePrefix) {
            return new BigQueryDayPartitionDestinations(projectId, datasetId, "yyyyMMdd", tablePrefix + "$");
        }
    
        private BigQueryDayPartitionDestinations(String projectId, String datasetId, String pattern, String table) {
            this.projectId = projectId;
            this.datasetId = datasetId;
            this.pattern = pattern;
            this.table = table;
        }
    
        @Override
        public TableDestination apply(ValueInSingleWindow<TableRow> input) {
            DateTimeFormatter partition = DateTimeFormat.forPattern(pattern).withZone(DateTimeZone.forID("Asia/Tokyo"));
            DateTimeFormatter formatter = DateTimeFormat.forPattern("yyyy-MM-dd").withZone(DateTimeZone.forID("Asia/Tokyo"));
    
            TableReference reference = new TableReference();
            reference.setProjectId(this.projectId);
            reference.setDatasetId(this.datasetId);
    
            var date = input.getValue().get("Date").toString();
            DateTime dateTime = formatter.parseDateTime(date);
    
            var tableId = table + dateTime.toInstant().toString(partition);
    
            reference.setTableId(tableId);
            return new TableDestination(reference, null, new TimePartitioning().setType("DAY").setField("Date"));
       }
    }