I have an Apache Beam pipeline which would take a list as arguments and use it in the Filter and Map function. Since these would be available as string I had converted using ast.literal_eval on them. Is there any other better way to do the same thing?
import argparse
import ast
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, GoogleCloudOptions
def run_pipeline(custom_args, beam_args):
elements = [
{'name': 'Jim', 'join_year': 2010, 'location': 'LA', 'role': 'Executive assistant'},
{'name': 'Tim', 'join_year': 2015, 'location': 'NY', 'role': 'Account manager'},
{'name': 'John', 'join_year': 2010, 'location': 'LA', 'role': 'Customer service representative'},
{'name': 'Bob', 'join_year': 2020, 'location': 'NJ', 'role': 'Customer service representative'},
{'name': 'Michael', 'join_year': 2019, 'location': 'CA', 'role': 'Scheduler'},
{'name': 'Adam', 'join_year': 2010, 'location': 'CA', 'role': 'Customer service representative'},
{'name': 'Andrew', 'join_year': 2009, 'location': 'TX', 'role': 'Account manager'},
{'name': 'James', 'join_year': 2017, 'location': 'NJ', 'role': 'Executive assistant'},
{'name': 'Paul', 'join_year': 2015, 'location': 'NY', 'role': 'Scheduler'},
{'name': 'Justin', 'join_year': 2015, 'location': 'NJ', 'role': 'Scheduler'}
]
opts = PipelineOptions(beam_args)
joinYear = [i for i in ast.literal_eval(custom_args.joinYear)]
selectCols = [i for i in ast.literal_eval(custom_args.selectCols)]
with beam.Pipeline(options=opts) as p:
(p
| "Create" >> beam.Create(elements)
| "Filter for join year in 2010 and 2015" >> beam.Filter(lambda item: item['join_year'] in joinYear)
| "Select name and location columns" >> beam.Map(lambda line : {key:value for (key,value) in line.items() if key in selectCols})
| beam.Map(print)
)
def main():
parser = argparse.ArgumentParser()
parser.add_argument("--joinYear",required=True)
parser.add_argument("--selectCols",required=True)
my_args, beam_args = parser.parse_known_args()
run_pipeline(my_args, beam_args)
if __name__ == '__main__':
main()
I run the code above like this python filterlist.py --joinYear='[2010,2015]' --selectCols="['name','location']"
In actual production use I would pass these parameters from a Cloud Function and launch the dataflow job. So was wondering if there is any other better way to do the same following better practise?
Unfortunately you can't directly pass a Python
list as program argument.
You are in the good direction with your choice and your implementation.
If you need to pass structure like List
, Dict
, List of Dict
, you need to pass it as String
in the pipeline option. Then in the job, transform this String
to the expected structure.
It is a kind of serialization(String)/deserialization(Object).
If you pass by a Cloud Function
or another process, you can format and apply the serialization programmatically in your code, for the option.
For example for a Dict
or a List of Dict
, you can use :
json.dumps
for serializationjson.loads
for deserialization