Search code examples
pythongoogle-bigqueryterraformgoogle-cloud-pubsubterraform-provider-gcp

Bigquery dataset using pub/sub subscription


Trying to create a dataset using pub/sub subscription message. https://cloud.google.com/pubsub/docs/bigquery. This one provides how to write message to the table, but how can I create a dataset using the message. For example - if I have this is test1 in my subscription, then I should be able to take the message(test1) and create a dataset based on that message. Just an empty dataset, no table needed. I have done all the research and didn't see any solution from google.


Solution

  • You can trigger a Cloud Function when the message is published on the topic with Pub Sub triggers :

    https://cloud.google.com/functions/docs/tutorials/pubsub

    gcloud functions deploy python-pubsub-function \
    --gen2 \
    --runtime=python310 \
    --region=REGION \
    --source=. \
    --entry-point=subscribe \
    --trigger-topic=YOUR_TOPIC_NAME
    

    You can also check this link :

    https://cloud.google.com/functions/docs/calling/pubsub

    The Cloud Function can use Python client to create your dataset, here an example :

    from google.cloud import bigquery
    
    import base64
    
    import functions_framework
    
    @functions_framework.cloud_event
    def subscribe(cloud_event):
        dataset_id_from_topic =  base64.b64decode(cloud_event.data["message"]["data"]).decode()
    
        # Construct a BigQuery client object.
        client = bigquery.Client()
    
        # Construct a full Dataset object to     send to the API.
        dataset = bigquery.Dataset(dataset_id_from_topic)
    
        # TODO(developer): Specify the geographic location where the dataset should reside.
        dataset.location = "US"
    
        # Send the dataset to the API for creation, with an explicit timeout.
        # Raises google.api_core.exceptions.Conflict if the Dataset already
        # exists within the project.
        dataset = client.create_dataset(dataset, timeout=30)  
    
        # Make an API request.
        print("Created dataset {}.   {}".format(client.project, dataset.dataset_id))