Search code examples
python-3.xapache-beamdataflow

Beam - Filter out Records from Bigquery


I am new to Apache Beam, and I trying to do three tasks

  1. Read Top 30 Items from the table
  2. Read Top 30 Stores from the table
  3. select required columns from the bigquery and apply Filter on the columns Items and Stores.

I have this below code, to execute the pipeline

with beam.Pipeline(options=pipeline_args) as p:
        #read the dataset from bigquery
        query_top_30_items = (
            p 
            | 'GetTopItemNumbers' >> beam.io.ReadFromBigQuery(
                query="""SELECT item_number, COUNT(item_number) AS freq_count FROM 
                [bigquery-public-data.iowa_liquor_sales.sales] GROUP BY item_number 
                ORDER BY freq_count DESC
                LIMIT 30"""
            )
            | 'ReadItemNumbers' >> beam.Map(lambda elem: elem['item_number'])
            | 'ItemNumberAsList' >> beam.combiners.ToList()
        )


        query_top_30_stores = (
            p
            |
            'GetTopStores' >> beam.io.ReadFromBigQuery(
                query = """SELECT store_number, COUNT(store_number) AS store_count
                 FROM [bigquery-public-data.iowa_liquor_sales.sales] GROUP BY
                 store_number ORDER BY store_count DESC LIMIT 30"""
            )
            | 'ReadStoreValues' >> beam.Map(lambda elem:elem['store_number'])
            | 'StoreValuesAsList' >> beam.combiners.ToList()
        )

        query_whole_table = (
            (query_top_30_items, query_top_30_stores)
            |'ReadTable' >> beam.io.ReadFromBigQuery(
                query="""SELECT item_number, store_number, bottles_sold,
                    state_bottle_retail  FROM [bigquery-public-data.iowa_liquor_sales.sales]""")
            | 'FilterByItems' >> beam.Filter(lambda row:row['item_number'] in query_top_30_items)
            | 'FilterByStore' >> beam.Filter(lambda row:row['store_number'] in query_top_30_stores)
        )

I have attached Traceback for reference. How Can I solve this error?

temp_location = pcoll.pipeline.options.view_as( Traceback (most recent call last): File "run.py", line 113, in run() File "run.py", line 100, in run | 'FilterByStore' >> beam.Filter(lambda row:row['store_number'] in query_top_30_stores) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", line 1058, in ror return self.transform.ror(pvalueish, self.label) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/transforms/ptransform.py", line 573, in ror result = p.apply(self, pvalueish, label) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 646, in apply return self.apply(transform, pvalueish) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/pipeline.py", line 689, in apply pvalueish_result = self.runner.apply(transform, pvalueish, self._options) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 188, in apply return m(transform, input, options) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/runners/runner.py", line 218, in apply_PTransform return transform.expand(input) File "/Library/Frameworks/Python.framework/Versions/3.7/lib/python3.7/site-packages/apache_beam/io/gcp/bigquery.py", line 1881, in expand temp_location = pcoll.pipeline.options.view_as( AttributeError: 'tuple' object has no attribute 'pipeline'

Since I am new to Beam, the code is not that optimized. Please let me know If I can optimize this code further.

Thanks for your time and Help!


Solution

  • beam.io.ReadFromBigQuery must be at the root of your pipeline, and takes the pipeline object (not a PCollection or tuple of PCollections) as input. Hence the error.

    As the other answer mentions, you could try to write the whole thing as a single BigQuery query. Otherwise, you could do the filtering after the read using side inputs, e.g.

    with beam.Pipeline(options=pipeline_args) as p:
        #read the dataset from bigquery
        query_top_30_items = ...
    
        query_top_30_stores = ...
    
        sales = p |'ReadTable' >> beam.io.ReadFromBigQuery(
            query="""SELECT item_number, store_number, bottles_sold,
        state_bottle_retail  FROM [bigquery-public-data.iowa_liquor_sales.sales]""")
    
        filtered = (
            sales
            | 'FilterByItems' >> beam.Filter(
                lambda row, items_side_input: row['item_number'] in items_side_input,
                items_side_input=beam.pvalue.AsList(query_top_30_items))
            | 'FilterByStore' >> beam.Filter(
                lambda row, stores_side_input: row['store_number'] in stores_side_input,
                stores_side_input=beam.pvalue.AsList(query_top_30_stores))
        )