Search code examples
python-3.xgoogle-cloud-platformpublish-subscribegoogle-cloud-pubsubgoogle-cloud-composer

Having problem to publish a message in Pub/Sub using Python from Airflow


I'm trying to publish a message in Pub/Sub but I get this error:

[2021-08-30 23:10:55,317] {taskinstance.py:1049} ERROR - 'Future' object has no attribute '_condition'
Traceback (most recent call last)
  File "/usr/local/lib/airflow/airflow/models/taskinstance.py", line 1046, in _run_raw_tas
    task.on_success_callback(context
  File "/home/airflow/gcs/plugins/logger.py", line 70, in on_success_task_instanc
    log_monitoring(DAG_STATE_SUCCESS, context=context
  File "/home/airflow/gcs/plugins/logger.py", line 220, in log_monitorin
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED
  File "/opt/python3.6/lib/python3.6/concurrent/futures/_base.py", line 284, in wai
    with _AcquireFutures(fs)
  File "/opt/python3.6/lib/python3.6/concurrent/futures/_base.py", line 146, in __enter_
    future._condition.acquire(
AttributeError: 'Future' object has no attribute '_condition

The code is the following:

# Publishes multiple messages to a Pub/Sub topic with an error handler. 
from concurrent import futures
from google.cloud import pubsub_v1
from typing import Any, Callable

topic_path = 'projects/.../topics/test_log_monitoring'
publish_futures = []

publisher = pubsub_v1.PublisherClient()

def get_callback(
    publish_future: pubsub_v1.publisher.futures.Future, data: str
) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
    def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
        try:
            # Wait 60 seconds for the publish call to succeed.
            print(publish_future.result(timeout=60))
        except futures.TimeoutError:
            print(f"Publishing {data} timed out.")

    return callback

record = {
    'Key1': 'Value1',
    'Key2': 'Value2',
    'Key3': 'Value3'
}

data = json.dumps(record).encode("utf-8")
# When you publish a message, the client returns a future.
publish_future = publisher.publish(topic_path, data)
# on-blocking. Publish failures are handled in the callback function.
publish_future.add_done_callback(get_callback(publish_future, data))
publish_futures.append(publish_future)

# Wait for all the publish futures to resolve before exiting.
futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

print(f"Published messages with error handler to {topic_path}.")

By the way, I'm following this official tutorial: https://cloud.google.com/pubsub/docs/samples/pubsub-publish-with-error-handler

Do you have any idea what's wrong?

To end, Is there different some way to wait message published?


Solution

  • I was able to reproduce your issue using the code snippet. I used Composer version 1.16.15 and Airflow version 1.10.15. I did not install any extra python libraries on mine.

    To fix this, update your Pubsub to the latest version which is 2.7.1 in your Cloud Composer instance. You can update it using the command gcloud composer environments update. See Installing a Python dependency from PyPI for more details.

    To be able to smoothly update the Pubsub library, explicitly define the Google libraries in your requirements.txt. This is because Google libraries are dependent to other Google libraries see Pubsub library dependencies. You can get the pre installed Google libraries in Cloud Composer pre installed packages reference. But if you have updated Google libraries, you can just include the versions that you use in the requirements.txt.

    requirements.txt

    google-ads==4.0.0
    google-api-core==1.26.1
    google-api-python-client==1.12.8
    google-apitools==0.5.31
    google-auth==1.28.0
    google-auth-httplib2==0.1.0
    google-auth-oauthlib==0.4.3
    google-cloud-automl==2.2.0
    google-cloud-bigquery==2.13.0
    google-cloud-bigquery-datatransfer==3.1.0
    google-cloud-bigquery-storage==2.1.0
    google-cloud-bigtable==1.7.0
    google-cloud-build==2.0.0
    google-cloud-container==1.0.1
    google-cloud-core==1.6.0
    google-cloud-datacatalog==3.1.0
    google-cloud-dataproc==2.3.0
    google-cloud-datastore==1.15.3
    google-cloud-dlp==1.0.0
    google-cloud-kms==2.2.0
    google-cloud-language==1.3.0
    google-cloud-logging==2.2.0
    google-cloud-memcache==0.3.0
    google-cloud-monitoring==2.0.0
    google-cloud-os-login==2.1.0
    google-cloud-pubsub==2.7.1
    google-cloud-pubsublite==0.3.0
    google-cloud-redis==2.1.0
    google-cloud-secret-manager==1.0.0
    google-cloud-spanner==1.19.1
    google-cloud-speech==1.3.2
    google-cloud-storage==1.36.2
    google-cloud-tasks==2.2.0
    google-cloud-texttospeech==1.0.1
    google-cloud-translate==1.7.0
    google-cloud-videointelligence==1.16.1
    google-cloud-vision==1.0.0
    google-cloud-workflows==0.2.0
    google-crc32c==1.1.2
    google-pasta==0.2.0
    google-resumable-media==1.2.0
    googleapis-common-protos==1.53.0
    graphviz==0.16
    greenlet==1.0.0
    grpc-google-iam-v1==0.12.3
    grpcio==1.38.1
    grpcio-gcp==0.2.2
    

    Update command:

    gcloud composer environments update your-environrment-name --update-pypi-packages-from-file requirements.txt --location your-composer-location
    

    When installed it will return done: enter image description here

    Check version in GCP Console-> Composer-> your-environment -> PYPI Packages:

    enter image description here

    Airflow test run: enter image description here

    Airflow logs: enter image description here

    DAG used:

    import datetime
    
    import airflow
    from airflow.operators import bash_operator
    from airflow.operators import python_operator
    import json
    from concurrent import futures
    from google.cloud import pubsub_v1
    from typing import Any, Callable
    
    YESTERDAY = datetime.datetime.now() - datetime.timedelta(days=1)
    
    default_args = {
        'owner': 'Composer Example',
        'depends_on_past': False,
        'email': [''],
        'email_on_failure': False,
        'email_on_retry': False,
        'retries': 1,
        'retry_delay': datetime.timedelta(minutes=5),
        'start_date': YESTERDAY,
    }
    
    def publish_error_handle():
        topic_path = 'projects/your-project-id/topics/test-topic'
        publish_futures = []
    
        publisher = pubsub_v1.PublisherClient()
    
        def get_callback(
            publish_future: pubsub_v1.publisher.futures.Future, data: str
        ) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
            def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
                try:
                    # Wait 60 seconds for the publish call to succeed.
                    print(publish_future.result(timeout=60))
                except futures.TimeoutError:
                    print(f"Publishing {data} timed out.")
    
            return callback
    
        record = {
            'Key1': 'Value1',
            'Key2': 'Value2',
            'Key3': 'Value3'
        }
    
        data = json.dumps(record).encode("utf-8")
        # When you publish a message, the client returns a future.
        publish_future = publisher.publish(topic_path, data)
        # on-blocking. Publish failures are handled in the callback function.
        publish_future.add_done_callback(get_callback(publish_future, data))
        publish_futures.append(publish_future)
    
        # Wait for all the publish futures to resolve before exiting.
        futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)
    
        print(f"Published messages with error handler to {topic_path}.")
    
    with airflow.DAG(
            'composer_sample_dag',
            'catchup=False',
            default_args=default_args,
            schedule_interval=datetime.timedelta(days=1)) as dag:
    
        publish_handle = python_operator.PythonOperator(
            task_id='publish_handle',
            python_callable=publish_error_handle
        )
    
        publish_handle