Search code examples
pythonmergegoogle-cloud-dataflowpipelineapache-beam

How can I merge two Pcollections(various sizes/data) with a common "key"(Street) via a side input?


I have two PCollections: one that pulls info from Pub/Sub, and one that pulls data from a CSV file. After some various transforms in each pipeline, I'd like to merge the two on a common key they both share, "STREET". I'm including the second PCollection as a side input. However, I'm getting an error on attempt to run.

I've attempted to utilize CoGroupByKey, but I kept receiving errors regarding the difference in types of data in the Pcollections. I tried refactoring the outputs, and setting the attribute of the PCollection via the __setattr__ to force the types to be equal, but it reported "mixed values" regardless. After further research, it seems like it's better to utilize side inputs, especially when there's disparity in data size between elements. Even with side inputs, I'm still unable to get past the current error:

from_runner_api raise ValueError('No producer for %s' % id)
ValueError: No producer for ref_PCollection_PCollection_6

My application logic is as follows:

def merge_accidents(element, pcoll):
    print(element)
    print(pcoll)
    "some code that will append to existing data"

accident_pl = beam.Pipeline()
accident_data = (accident_pl |
                        'Read' >> beam.io.ReadFromText('/modified_Excel_Crashes_Chicago.csv')
                        | 'Map Accidents' >> beam.ParDo(AccidentstoDict())
                        | 'Count Accidents' >> Count.PerKey())

chi_traf_pl = beam.Pipeline(options=pipeline_options)
chi_traffic = (chi_traf_pl | 'ReadPubSub' >> beam.io.ReadFromPubSub(subscription=subscription_name, with_attributes=True)
                           | 'GeoEnrich&Trim' >> beam.Map(loc_trim_enhance)
                           | 'TimeDelayEnrich' >> beam.Map(timedelay)
                           | 'TrafficRatingEnrich' >> beam.Map(traffic_rating)
                           | 'MergeAccidents' >> beam.Map(merge_accidents, pcoll=AsDict(accident_data))
                           | 'Temp Write'>> beam.io.WriteToText('testtime', file_name_suffix='.txt'))

accident_pl.run()
chi_result = chi_traf_pl.run()
chi_result.wait_until_finish()```

**Pcoll 1:**
[{'segment_id': '1', 'street': 'Western Ave', 'direction': 'EB', 'length': '0.5', 'cur_traffic': '24', 'county': 'Cook County', 'neighborhood': 'West Elsdon', 'zip_code': '60629', 'evnt_timestamp': '2019-04-01 20:50:20.0', 'traffic_rating': 'Heavy', 'time_delay': '0.15'}]
**Pcoll 2:**
('MILWAUKEE AVE', 1)
('CENTRAL AVE', 2)
('WESTERN AVE', 6)

**Expected:**
[{'segment_id': '1', 'street': 'Western Ave', 'direction': 'EB', 'length': '0.5', 'cur_traffic': '24', 'county': 'Cook County', 'neighborhood': 'West Elsdon', 'zip_code': '60629', 'evnt_timestamp': '2019-04-01 20:50:20.0', 'traffic_rating': 'Heavy', 'time_delay': '0.15', 'accident_count': '6'}]

**Actual Results:**
"from_runner_api raise ValueError('No producer for %s' % id)ValueError: No producer for ref_PCollection_PCollection_6

Solution

  • So I figured out the problem. After looking through pipeline.py and the unittest source for side inputs, I realized that there's a check against the Pipeline object created.

    I'm new to this and so I originally believed that you needed to create two separate Pipeline objects(streaming vs batch) so I could pass different options to both; i.e.streaming: True. That being said, I don't believe that is needed.

    After consolidating them to a single object like below, the error went away and I was able to accept side inputs into the function:

    '''

    pipeline = beam.Pipeline(options=pipeline_options)
    accident_data = (pipeline
                     | 'Read' >> beam.io.ReadFromText('modified_Excel_Crashes_Chicago.csv')
                     | 'Map Accidents' >> beam.ParDo(AccidentstoDict())
                     | 'Count Accidents' >> Count.PerKey())
    
    chi_traffic = (pipeline
                   | 'ReadPubSub' >> beam.io.ReadFromPubSub(subscription=subscription_name, with_attributes=True)
                   | 'GeoEnrich&Trim' >> beam.Map(loc_trim_enhance)
                   | 'TimeDelayEnrich' >> beam.Map(timedelay)
                   | 'TrafficRatingEnrich' >> beam.Map(traffic_rating)
                   | 'MergeAccidents' >> beam.Map(merge_accidents, pcoll=pvalue.AsDict(accident_data))
                   | 'Temp Write' >> beam.io.WriteToText('testtime',
                                                         file_name_suffix='.txt'))
    
    chi_result = pipeline.run()
    chi_result.wait_until_finish()
    

    '''