Search code examples
pythongoogle-cloud-pubsub

How to add the message content to the results in Google Pub/Sub?


I have the following code, based on Google's official API

def publish_messages_with_error_handler(project_id: str = GOOGLE_CLOUD_PROJECT_ID,
                                        topic_id: str = GOOGLE_CLOUD_TOPIC_ID,
                                        data: List[str] = []) -> dict:
    # [START pubsub_publish_with_error_handler]
    """Publishes multiple messages to a Pub/Sub topic with an error handler."""

    publisher = pubsub_v1.PublisherClient()
    topic_path = publisher.topic_path(project_id, topic_id)
    publish_futures = []

    result = {
        "succeded": [],
        "failed": []
    }

    def get_callback(publish_future: pubsub_v1.publisher.futures.Future,
                     data: str) -> Callable[[pubsub_v1.publisher.futures.Future], None]:
        def callback(publish_future: pubsub_v1.publisher.futures.Future) -> None:
            try:
                logger.info(publish_future.result(timeout=0))
            except futures.TimeoutError:
                logger.info(f"Publishing {data} timed out.")

        return callback

    if data:
        for message in data:
            publish_future = publisher.publish(topic_path, message.encode("utf-8"))
            publish_future.add_done_callback(get_callback(publish_future, message))
            publish_futures.append(publish_future)

    futures.wait(publish_futures, return_when=futures.ALL_COMPLETED)

    print(f"Published messages with error handler to {topic_path}.")

    for future in publish_futures:
        if future.exception():
            result["failed"].append(future.result())
        else:
            result["succeded"].append(future.result())

    return result

The data variable is just a list of uuid4. If the publish succeeded, I want to append the message's id in results['succeded'], otherwise, I want to append the message's id in results['failed'].

How can I achieve that? Thanks in advance.


Solution

  • Your code is already doing that - captures all success and failure in two separate list and return them as dictionary with keys succeded and failed:

    # Define a dictionary - 2 keys "succeded" and "failed" with empty list
    result = { "succeded": [], "failed": []}
    
    # append test values to empty list
    result["succeded"].append("success-uuid") #from future.result() - success
    result["failed"].append("failed-uuid") #from future.result() - failure
    
    
    result["succeded"].append("868716387") #from future.result() - success
    result["failed"].append("97234692369") #from future.result() - failure
    
    print(result)
    

    Output:

    {'succeded': ['success-uuid', '868716387'], 'failed': ['failed-uuid', '97234692369']}