I am trying to start a simple batch ETL process on Dataflow for learning purposes. This is the logic I have performed:
Cloud Storage > PubSub > Cloud Function > DataFlow > Cloud Storage
A PubSub topic publish a message whenever a new file is uploaded to a bucket. Then, a CloudFunction listen a subscription on that topic, and starts a DataFlow job reading the file, performing the processing of the data and saving it to a new file on that same bucket.
I have been able to perform all the logic, however I am struggling with starting the Dataflow job through the CloudFunction instance. My function starts the job without any problem, but after a minutes the worker shows the following error message:
Error message from worker: Traceback (most recent call last): File "/usr/local/lib/python3.7/site-packages/
dataflow_worker/batchworker.py", line 773, in run self._load_main_session(self.local_staging_directory) File
"/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 514, in _load_main_session
pickler.load_session(session_file) File "/usr/local/lib/python3.7/site-packages/apache_beam/internal/
pickler.py", line 311, in load_session return dill.load_session(file_path) File "/usr/local/lib/python3.7/
site-packages/dill/_dill.py", line 368, in load_session module = unpickler.load() File "/usr/local/lib/
python3.7/site-packages/dill/_dill.py", line 472, in load obj = StockUnpickler.load(self) File "/usr/local/
lib/python3.7/site-packages/dill/_dill.py", line 827, in _import_module return getattr(__import__(module,
None, None, [obj]), obj) ModuleNotFoundError: No module named 'google.cloud.functions'
The important part of the error is:
ModuleNotFoundError: No module named 'google.cloud.functions'
My CloudFunction directory looks like this:
/
requirements.txt
main.py
pipeline.py
requirements.txt
# Function dependencies, for example:
# package>=version
apache-beam[gcp]
main.py
import base64
import json
from pipeline import run
def start_job(event, context):
message = base64.b64decode(event['data']).decode('utf-8')
message = json.loads(message)
bucket = message['bucket']
filename = message['name']
if filename.startswith('raw/'):
run(bucket, filename)
print('Job sent to Dataflow')
else:
print('File uploaded to unknow directory: {}'.format(source_file))
pipeline.py
import apache_beam as beam
from datetime import datetime
import argparse
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.options.pipeline_options import SetupOptions
options = PipelineOptions()
google_cloud_options = options.view_as(GoogleCloudOptions)
google_cloud_options.project = "dpto-bigdata"
google_cloud_options.region = "europe-west1"
google_cloud_options.job_name = "pipeline-test"
google_cloud_options.staging_location = "gs://services-files/staging/"
google_cloud_options.temp_location = "gs://services-files/temp/"
#options.view_as(StandardOptions).runner = "DirectRunner" # use this for debugging
options.view_as(StandardOptions).runner = "DataFlowRunner"
options.view_as(SetupOptions).save_main_session = True
output_suffix = '.csv'
output_header = 'Name,Total,HP,Attack,Defence,Sp_attack,Sp_defence,Speed,Average'
def run(bucket, filename):
source_file = 'gs://{}/{}'.format(bucket, filename)
now = datetime.now().strftime('%Y%m%d-%H%M%S')
output_prefix = 'gs://{}/processed/{}'.format(bucket, now)
with beam.Pipeline(options=options) as p:
raw_values = (
p
| "Read from Cloud Storage" >> beam.io.ReadFromText(source_file, skip_header_lines=1)
| "Split columns" >> beam.Map(lambda x: x.split(','))
| "Cleanup entries" >> beam.ParDo(ElementCleanup())
| "Calculate average stats" >> beam.Map(calculate_average)
| "Format output" >> beam.Map(format_output)
| "Write to Cloud Storage" >> beam.io.WriteToText(file_path_prefix=output_prefix, file_name_suffix=output_suffix, header=output_header)
)
class ElementCleanup(beam.DoFn):
def __init__(self):
self.transforms = self.map_transforms()
def map_transforms(self):
return [
[self.trim, self.to_lowercase], # Name
[self.trim, self.to_float], # Total
[self.trim, self.to_float], # HP
[self.trim, self.to_float], # Attack
[self.trim, self.to_float], # Defence
[self.trim, self.to_float], # Sp_attack
[self.trim, self.to_float], # Sp_defence
[self.trim, self.to_float] # Speed
]
def process(self, row):
return [self.clean_row(row, self.transforms)]
def clean_row(self, row, transforms):
cleaned = []
for idx, col in enumerate(row):
for func in transforms[idx]:
col = func(col)
cleaned.append(col)
return cleaned
def to_lowercase(self, col:str):
return col.lower()
def trim(self, col:str):
return col.strip()
def to_float(self, col:str):
return (float(col) if col != None else None)
def calculate_average(row):
average = round(sum(row[2:]) / len(row[2:]), 2)
row.append(average)
return row
def format_output(row):
row = [str(col) for col in row]
return ','.join(row)
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument(
"--bucket",
help="Bucket to read from."
)
parser.add_argument(
"--filename",
help="File to read from."
)
args = parser.parse_args()
run(args.bucket, args.filename)
I have been reading a while about this topic. Before this error, I had a similar error showing ModuleNotFoundError: No module named 'main'
. I was able to fix this adding the pipeline option options.view_as(SetupOptions).save_main_session = True
, however I have not find any solution to the error which I am currently facing.
I expected Dataflow workers not to depend on CloudFunction once I had started the pipeline job, but it seems as they are still trying to communicate somehow with it.
I think the best approach here would be to use templates since you are not changing the code but the path. Once you have the template, you can just make an API call to launch them. It surely will be less hassle to set up and probably more resilient, since you would not depend as much on Cloud Functions.
There's another approach that I think would be even better, which doesn't require Cloud Functions. You could use something like MatchAll.continuously
from Java. If you need / want Python, there's no counterpart for it yet, but I took the liberty to create a version of it that does the same thing and send a Pull Request for a new Ptransform.
The idea is that every X seconds, you check for new files and process them depending on your pipeline.
If you don't want for the Pull Request to be merged (if so), you can just copy the DoFn:
class MatchContinuously(beam.PTransform):
def __init__(
self,
file_pattern,
interval=360.0,
has_deduplication=True,
start_timestamp=Timestamp.now(),
stop_timestamp=MAX_TIMESTAMP):
self.file_pattern = file_pattern
self.interval = interval
self.has_deduplication = has_deduplication
self.start_ts = start_timestamp
self.stop_ts = stop_timestamp
def expand(self, pcol):
impulse = pcol | PeriodicImpulse(
start_timestamp=self.start_ts,
stop_timestamp=self.stop_ts,
fire_interval=self.interval)
match_files = (
impulse
| beam.Map(lambda x: self.file_pattern)
| MatchAll())
if self.has_deduplication:
match_files = (
match_files
# Making a Key Value so each file has its own state.
| "To KV" >> beam.Map(lambda x: (x.path, x))
| "Remove Already Read" >> beam.ParDo(_RemoveDuplicates()))
return match_files
class _RemoveDuplicates(beam.DoFn):
FILES_STATE = BagStateSpec('files', StrUtf8Coder())
def process(self, element, file_state=beam.DoFn.StateParam(FILES_STATE)):
path = element[0]
file_metadata = element[1]
bag_content = [x for x in file_state.read()]
if not bag_content:
file_state.add(path)
_LOGGER.info("Generated entry for file %s", path)
yield file_metadata
else:
_LOGGER.info("File %s was already read", path)
An example pipeline:
(p | MatchContinuously("gs://apache-beam-samples/shakespeare/*", 180)
| Map(lambda x: x.path)
| ReadAllFromText()
| Map(lambda x: logging.info(x))
)
A third approach could be keep using the GCS notifications and use PubSub
+ MatchAll
. The pipeline would look like:
(p | ReadFromPubSub(topic)
| MatchAll())
)
Depending the frequency of the new files and if you want to use notifications or not, you can decide between the three approaches.