I am using the below script to get the message from Google Pub/Sub. When I execute the code, it is not waiting for the sleep time mentioned in the function time_sleep(). My expected result is,
Message from Pub/Sub: Message 1
<< Script should wait for 5 Seconds >>
Success
After 5 Seconds
Message from Pub/Sub: Message 2
<< Script should wait for 5 Seconds >>
Success
After 5 Seconds
Message from Pub/Sub: Message 3
<< Script should wait for 5 Seconds >>
Success
After 5 Seconds
But when I run, It is first displaying all messages from Pub/Sub
Message from Pub/Sub: Message 1
Message from Pub/Sub: Message 2
Message from Pub/Sub: Message 3
<<Script is waiting for 5 seconds>>
Success
After 5 Seconds
Success
After 5 Seconds
Success
After 5 Seconds
from google.cloud import pubsub_v1
from concurrent.futures import TimeoutError
def time_sleep():
time.sleep(5)
print("Success")
# Manage the Message and Decode it
def callback(message: pubsub_v1.subscriber.message.Message) -> None:
try:
# Get the message data
message_data = message.data.decode("utf-8")
# Parse the message data as JSON
json_data = json.loads(message_data)
print("Message from Pub/Sub: ",json_data)
time_sleep()
print("After 5 seconds")
except Exception as e:
# Handle any exceptions that may occur
print(f"Error processing message: {e}")
# message.nack()
# Create credentials using the JSON private key
credentials = service_account.Credentials.from_service_account_info(
secret_data["google_auth"]
)
subscriber = pubsub_v1.SubscriberClient(credentials=credentials)
publisher = pubsub_v1.SubscriberClient(credentials=credentials)
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
# logger.info(f"Getting Data: {subscriber}")
except TimeoutError:
# logger.info(f"Failed Fetching: {subscriber}")
streaming_pull_future.cancel() # Trigger the shutdown.
streaming_pull_future.result() # Block until the shutdown is complete.
When I checked in Google, I noticed Python is running script with multiprocessing. Please help me to make my script running in single order way.
There are two ways you could achieve your desired behavior. Though note that in general, it is best to process messages in parallel in order to maximize throughput and minimize latency.
The first way is by passing a scheduler
to subscribe
that only allows one callback at a time:
import concurrent
...
executor = concurrent.futures.ThreadPoolExecutor(max_workers=1)
scheduler = pubsub_v1.subscriber.scheduler.ThreadScheduler(executor=executor)
streaming_pull_future = subscriber.subscribe(subscription_path,
callback=callback,
scheduler=scheduler)
The second way is to set the flow control limit to only allow one message:
flow_control = pubsub_v1.types.FlowControl(max_messages=1)
streaming_pull_future = subscriber.subscribe(subscription_path,
callback=callback,
flow_control=flow_control)
There are a few differences. If you go with the first approach, messages will pile up in the client library (up to the max allowed by the flow control) waiting to be processed, which could take up memory even though the messages are not going to be processed soon. In the second approach, only one message will be sent from the server at a time, which means there could be additional latency in receiving the next message. You will also need to ensure that you ack
or nack
each message as not doing so will result in it still being considered outstanding and therefore no next message will be sent.