Search code examples
pythongoogle-cloud-dlp

pub_sub action from google sample code errors with missing 1 required positional argument: 'callback'


I am setting up a google DLP scan on a big query table, to look for identifiable personal information. I have been working through the google sample code for this, but have had problems with the pub/sub element of the code

This is for a python google cloud function calling google dlp, using the google sample here using the method inspect_bigquery.

...

actions = [{
    'pub_sub': {'topic': '{}/topics/{}'.format(parent, topic_id)},
    'save_findings': {
        'output_config': {
                'table': {
                    'project_id': project,
                    'dataset_id': dataset_id,
                    'table_id': table_id + '_inspection_results',
                }
            }
    },
}]

...

subscriber = google.cloud.pubsub.SubscriberClient()
subscription_path = subscriber.subscription_path(
    project, subscription_id)
#    subscription = subscriber.subscribe(subscription_path, callback)
subscription = subscriber.subscribe(subscription_path)

...

def callback(message):
    try:
        if (message.attributes['DlpJobName'] == operation.name):
            # This is the message we're looking for, so acknowledge it.
            message.ack()

            # Now that the job is done, fetch the results and print them.
            job = dlp.get_dlp_job(operation.name)
            if job.inspect_details.result.info_type_stats:
                for finding in job.inspect_details.result.info_type_stats:
                    print('Info type: {}; Count: {}'.format(
                        finding.info_type.name, finding.count))
            else:
                print('No findings.')

            # Signal to the main thread that we can exit.
            job_done.set()
        else:
            # This is not the message we're looking for.
            message.drop()
    except Exception as e:
        # Because this is executing in a thread, an exception won't be
        # noted unless we print it manually.
        print(e)
        raise

# Register the callback and wait on the event.
subscription.open(callback)
finished = job_done.wait(timeout=timeout)
if not finished:
    print('No event received before the timeout. Please verify that the '
          'subscription provided is subscribed to the topic provided.')

There are two errors I get with this, when I leave the subscribe method with just the subscription path, it errors with TypeError: subscribe() missing 1 required positional argument: 'callback'.

When I put the callaback into the subscribe method it fails with Function execution took 60002 ms, finished with status: 'timeout' No event received before the timeout. Please verify that the subscription provided is subscribed to the topic provided.

The save findings action does however work, and I am able to see the results in bigquery after a couple of seconds.

Thanks


Solution

  • Couple things: 1. Just so you know, you can leave table_id blank if you don't want to be in the business of generating them.

    But to your actual question:

    1. Are you running this within Cloud Functions by chance, which has execution deadlines? (https://cloud.google.com/functions/docs/concepts/exec#timeout)

    If yes, you actually want to have a Cloud Function subscribe to the pub/sub via triggers (https://cloud.google.com/functions/docs/calling/pubsub), not in your code to avoid the timeouts. There is a specific DLP solution guide here on that https://cloud.google.com/solutions/automating-classification-of-data-uploaded-to-cloud-storage#create_pubsub_topic_and_subscription

    Helpful at all?