I've set up a Dataflow pipeline that ingests messages from Pub/Sub, converts to a dict and prints the messages.
Here is the script I've written:
import apache_beam as beam
import logging
import message_pb2
from apache_beam.options.pipeline_options import StandardOptions
from google.protobuf.json_format import MessageToDict
TOPIC_PATH = "projects/<TOPIC ID>/topics/<TOPIC NAME>"
def protoToDict(msg, schema_class):
message = schema_class()
if isinstance(msg, (str, bytes)):
message.ParseFromString(msg)
else:
return "Invalid Message - something isn't quite right."
return MessageToDict(message, preserving_proto_field_name=True)
pipelineOptions = beam.options.pipeline_options.PipelineOptions()
pipelineOptions.view_as(StandardOptions).streaming = True
pipeline = beam.Pipeline(options=pipelineOptions)
data = (
pipeline
| 'Read from PubSub' >> beam.io.ReadFromPubSub(topic=TOPIC_PATH)
| 'Proto to Dict' >> beam.Map(lambda pb_msg: protoToDict(pb_msg, message_pb2.Message))
| 'Log Result' >> beam.Map(lambda msg: logging.info(msg))
)
pipeline.run()
When I run this with:
python -m <script name> --region=europe-west2 --runner=DataflowRunner --project=<PROJECT ID> --worker-machine-type=n1-standard-3
I receive this error:
creation failed: The zone 'projects/<PROJECT ID>/zones/europe-west2-b' does not have enough resources available to fulfill the request. Try a different zone, or try again later.
I've seen various other sources however the suggestions given are along the lines of "try a different machine type until it works" or "wait until there are more resources".
Surely I'm doing something wrong and it's not on Google's side?
After testing the pipeline on us-central1
I confirmed that this was a Google server issue.
The solution I came to was to use reservations. Although they do incur cost, if you reserve a VM for, say 3 years you will be entitled to a discount.
Please bear in mind the protoToDict
function will not work in the code above. See the apache-beam structure for more details if you find yourself in the same position.