Search code examples
javagoogle-bigqueryavrogoogle-cloud-pubsub

PubSub to BQ Subscription with Array Data Type on Avro and REPEATED type on BQ


Avro Schema

Pubsub schema created using below schema.

{
  "namespace": "model",
  "name": "Point",
  "type": "record",
  "fields": [
    {
      "name": "id1",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "id2",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "id3",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "id4",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "id5",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "reference",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "id6",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "code",
      "type": [
        "null",
        "string"
      ],
      "default": null
    },
    {
      "name": "status1",
      "type": {
        "type": "array",
        "default": null,
        "items": {
          "name": "status1",
          "type": "record",
          "default": null,
          "fields": [
            {
              "name": "status1",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "status2",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "status3",
              "type": [
                "null",
                "string"
              ],
              "default": null
            }
          ]
        }
      }
    },
    {
      "name": "phase",
      "type": {
        "type": "array",
        "default": null,
        "items": {
          "name": "phase",
          "type": "record",
          "default": null,
          "fields": [
            {
              "name": "phase1",
              "type": [
                "null",
                "string"
              ],
              "default": null
            },
            {
              "name": "created_at",
              "type": [
                "null",
                "string"
              ],
              "default": null
            }
          ]
        }
      }
    },
    {
      "name": "amount",
      "type": {
        "name": "Amount",
        "type": "record",
        "default": null,
        "fields": [
          {
            "name": "amount",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "currency",
            "type": [
              "null",
              "string"
            ],
            "default": null
          }
        ]
      }
    },
    {
      "name": "user",
      "type": {
        "name": "User",
        "type": "record",
        "default": null,
        "fields": [
          {
            "name": "name",
            "type": [
              "null",
              "string"
            ],
            "default": null
          }
        ]
      }
    },
    {
      "name": "created_at",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

BQ Schema

  "schema": {
    "fields": [
      {
        "mode": "NULLABLE",
        "name": "id1",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "id2",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "id3",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "id4",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "id5",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "reference",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "id6",
        "type": "STRING"
      },
      {
        "mode": "NULLABLE",
        "name": "code",
        "type": "STRING"
      },
      {
        "fields": [
          {
            "mode": "NULLABLE",
            "name": "status1",
            "type": "STRING"
          },
          {
            "mode": "NULLABLE",
            "name": "status2",
            "type": "STRING"
          },
          {
            "mode": "NULLABLE",
            "name": "status3",
            "type": "TIMESTAMP"
          }
        ],
        "mode": "REPEATED",
        "name": "status1",
        "type": "RECORD"
      },
      {
        "fields": [
          {
            "mode": "NULLABLE",
            "name": "phase1",
            "type": "STRING"
          },
          {
            "mode": "NULLABLE",
            "name": "created_at",
            "type": "TIMESTAMP"
          }
        ],
        "mode": "REPEATED",
        "name": "phase",
        "type": "RECORD"
      },
      {
        "fields": [
          {
            "mode": "NULLABLE",
            "name": "amount",
            "type": "BIGNUMERIC"
          },
          {
            "mode": "NULLABLE",
            "name": "currency",
            "type": "STRING"
          }
        ],
        "mode": "NULLABLE",
        "name": "amount",
        "type": "RECORD"
      },
      {
        "fields": [
          {
            "mode": "NULLABLE",
            "name": "name",
            "type": "STRING"
          }
        ],
        "mode": "NULLABLE",
        "name": "user",
        "type": "RECORD"
      },
      {
        "mode": "NULLABLE",
        "name": "created_at",
        "type": "TIMESTAMP"
      }
    ]
  }

Pubsub to BQ Subscription

Pubsub to BQ Subscription and created

Publishing Messages

Generated Avro classes with Java Avro Plugin and publishing messages from there.

Problem

Messages are getting persisted fine to BQ However I can't see repeated column populated properly, there are still multiple rows created for same data. For e.g. phase and status is REPEATED column on BQ side and ARRAY type on AVRO side, so when I am publishing multiple messages where everything except phase and status1 are same, ideally there would be one row and multiple granular rows within phase and status1 but it is not happening and instead there are separate rows with duplicated data.


Solution

  • Using repeated columns in the BigQuery table and arrays in the topic schema do not mean that multiple messages are coalesced into single rows in the table if all that differs is the array/repeated fields. There is no way to do that type of implicit merging of fields via BigQuery subscriptions. You would need to published a message with the repeated fields already merged into a single array.