Search code examples
pythonjsonpython-3.xetlapache-beam

Apache Beam - JSON grouping


I'm new to apache beam in python3, I have to build certain pipeline with it, and I have one last step that I am clueless how to perform.

I have transformed and clean JSON elements per line, and I want them to be grouped by key and all elements I want to be stored in it (rest to be dropped).

So for example lines

{"Name":"Mark", "age":23, "transaction_no": "001", "price":59.99, "someflag" : True}
{"Name":"Mark", "age":23, "transaction_no": "002", "price":10.00, "someflag" : False}

to be transformed to single JSON object:

{"Mark" : [{"age":23, "transaction_no": "001", "price":59.99}, {"age":23, "transaction_no": "002", "price":10.00}

the elements in the list would be only those that I pick (some eg flag is dropped)

What would be most efficient way for such grouping in Apache Beam?

Appreciate any help!!!


Solution

  • Here is the sample code of what you need. As per my undertsnading you just need the mapper to return json everytime and NOT the list of dicts.

    So in that case when you need just a json you can write mapper as below.

    import apache_beam as beam
    
    
    def  map_as_json(item,key_col,cols_to_exclude):
        row = {
            item[key_col]: [
            {
                key: val for  key,val in item.items() if key not in cols_to_exclude and key not in key_col
            }
        ] 
        }
        return row
        
    with beam.Pipeline() as p:
        group_stocks_by_date_name = (
            p
            | 'create'>>beam.Create(
                [
                    {"Name":"Mark", "age":23, "transaction_no": "001", "price":59.99, "someflag" : True},
                    {"Name":"Mark", "age":23, "transaction_no": "002", "price":10.00, "someflag" : False}
                ]
            )
            | 'selective details'>> beam.Map(map_as_json,key_col='Name',cols_to_exclude=['someflag'])
            | 'print'>>beam.Map(print)
        )
    

    please mark as answer if this helps.