I am new to Apache Beam, and I trying to do three tasks
I have this below code, to execute the pipeline
with beam.Pipeline(options=pipeline_args) as p:
#read the dataset from bigquery
query_top_30_items = (
p
| 'GetTopItemNumbers' >> beam.io.ReadFromBigQuery(
query="""SELECT item_number, COUNT(item_number) AS freq_count FROM
[bigquery-public-data.iowa_liquor_sales.sales] GROUP BY item_number
ORDER BY freq_count DESC
LIMIT 30"""
)
| 'ReadItemNumbers' >> beam.Map(lambda elem: elem['item_number'])
| 'ItemNumberAsList' >> beam.combiners.ToList()
)
query_top_30_stores = (
p
|
'GetTopStores' >> beam.io.ReadFromBigQuery(
query = """SELECT store_number, COUNT(store_number) AS store_count
FROM [bigquery-public-data.iowa_liquor_sales.sales] GROUP BY
store_number ORDER BY store_count DESC LIMIT 30"""
)
| 'ReadStoreValues' >> beam.Map(lambda elem:elem['store_number'])
| 'StoreValuesAsList' >> beam.combiners.ToList()
)
query_whole_table = (
(query_top_30_items, query_top_30_stores)
|'ReadTable' >> beam.io.ReadFromBigQuery(
query="""SELECT item_number, store_number, bottles_sold,
state_bottle_retail FROM [bigquery-public-data.iowa_liquor_sales.sales]""")
| 'FilterByItems' >> beam.Filter(lambda row:row['item_number'] in query_top_30_items)
| 'FilterByStore' >> beam.Filter(lambda row:row['store_number'] in query_top_30_stores)
)
I have attached Traceback for reference. How Can I solve this error?
temp_location = pcoll.pipeline.options.view_as( Traceback (most recent call last): File "run.py", line 113, in run() File "run.py", line 100, in run | 'FilterByStore' >> beam.Filter(lambda row:row['store_number'] in query_top_30_stores) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", line 1058, in ror return self.transform.ror(pvalueish, self.label) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", line 573, in ror result = p.apply(self, pvalueish, label) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 646, in apply return self.apply(transform, pvalueish) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 689, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 188, in apply return m(transform, input, options) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 218, in apply_PTransform return transform.expand(input) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1881, in expand temp_location = pcoll.pipeline.options.view_as( AttributeError: 'tuple' object has no attribute 'pipeline'
Since I am new to Beam, the code is not that optimized. Please let me know If I can optimize this code further.
Thanks for your time and Help!
beam.io.ReadFromBigQuery
must be at the root of your pipeline, and takes the pipeline object (not a PCollection
or tuple of PCollection
s) as input. Hence the error.
As the other answer mentions, you could try to write the whole thing as a single BigQuery query. Otherwise, you could do the filtering after the read using side inputs, e.g.
with beam.Pipeline(options=pipeline_args) as p:
#read the dataset from bigquery
query_top_30_items = ...
query_top_30_stores = ...
sales = p |'ReadTable' >> beam.io.ReadFromBigQuery(
query="""SELECT item_number, store_number, bottles_sold,
state_bottle_retail FROM [bigquery-public-data.iowa_liquor_sales.sales]""")
filtered = (
sales
| 'FilterByItems' >> beam.Filter(
lambda row, items_side_input: row['item_number'] in items_side_input,
items_side_input=beam.pvalue.AsList(query_top_30_items))
| 'FilterByStore' >> beam.Filter(
lambda row, stores_side_input: row['store_number'] in stores_side_input,
stores_side_input=beam.pvalue.AsList(query_top_30_stores))
)