Search code examples
pythongoogle-cloud-dataflowapache-beam

dataflow python - KeyError even though no input to the step and the key exists for matched cases


I'm writing a data pipeline to extract 4 types of json which comes with different attribute structure and write it to Bigquery.

My pipeline :

data =  (pipeline 
                    | "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=known_args.input_subscription)
                )
        
        
        # first branch - push message to Datastore
        p1 =    (data  
                    | "Add publish time" >> ParDo(AddTimestamp()) 
                    | "Create Entity" >> Map(EntityWrapper(known_args.kind).make_entity) 
                    | "Write to Datastore" >> WriteToDatastore(DATASTORE_PROJECT)
        )
        # second branch - write data to Bigquery 
        p2 =    (data  
                    | "classify request depending on type" >> ParDo(ClassifyReq()).with_outputs('label', 'ads', 'website', 'post')
        )

        for type in ['label', 'ads', 'website', 'post'] :
            
            result =   (p2[type] 
                            | "extract req: " + type >> ParDo(ExtractReq(type)) # step label name must be unique
                            | "Write to Bigquery: " + type >> io.WriteToBigQuery(
                                known_args.output_table,
                                schema=known_args.output_schema,
                                write_disposition=io.BigQueryDisposition.WRITE_APPEND,
                                )
                        )

My function :

class ClassifyReq(DoFn):
    """
    A transform that classify type of Pub/Sub messages and outputs element with tag
    """
    def process(self,element):
        type = ''
        req_str = str(element)
        req = json.loads(element.decode('utf-8'))
        
        if 'inbox_labels' in req_str:
            type = 'label'
        elif 'ad_id' in req_str:
            type = 'ads'
        elif ('postback' in req_str or 'referral' in req_str) and 'ref' in req_str:
            type = 'website'
        elif ('feed' in req_str) and ('comment' in req_str): # only comment , no reaction_type
            data = req['entry'][0]['changes'][0]['value']
            from_id = data['from']['id']
            if from_id not in page_data.keys():  # only update not from page admin
                type = 'post'

        yield TaggedOutput(type, element)
class ExtractReq(DoFn):
    """
    A Transform that extract element and map data for Bigquery table depending on 
    given tag from the prior step.
    """
    def __init__(self, type):
        self.type = type

    def process(self, element):
        req = json.loads(element.decode("utf-8"))
        page_id = req['entry'][0]['id']
        if self.type == 'label':
            unix_datetime = req['entry'][0]['time']
            data = req['entry'][0]['changes'][0]['value']
            customer_id = data['user']['id']
            label = data['label']['page_label_name']
            label_action = data['action']
            row = {'page_id': page_id,
            'type': self.type, 
            'customer_id' : customer_id, 
            'unix_datetime': unix_datetime,
            'ad_data' : None,
            'ad_type' : None,
            'ad_title' : None,
            'post_id': None, 
            'post_link': None,
            'label' : label,
            'action' : label_action
            }
            
        elif self.type == 'ads':
            data = req['entry'][0]['messaging'][0]
            customer_id = data['sender']['id']
            unix_datetime = int(str(req['entry'][0]['time'])[:-3])
            if isinstance(data, str):
                data = json.loads(str)
            ad_id = data['referral']['ad_id']
            ad_title = data['referral']['ads_context_data']['ad_title']
            ad_type = data['referral']['type']
            post_id = data['referral']['ads_context_data']['post_id']
            
            row = {'page_id': page_id,
            'type': self.type, 
            'customer_id' : customer_id, 
            'unix_datetime': unix_datetime,
            'ad_data' : ad_id,
            'ad_type' : ad_type,
            'ad_title' : ad_title,
            'post_id': post_id, 
            'post_link': None,
            'label' : None,
            'action' : None
            }
            
        elif self.type == 'website':
            data = req['entry'][0]['messaging'][0]
            # timestamp has 13 digits so cut out milli second unit
            unix_datetime = int(str(req['entry'][0]['time'])[:-3])
            if isinstance(data, str):
                data = json.loads(str)
            customer_id = data['sender']['id']
            ref = data['referral']['ref']

            row = {'page_id': page_id,
            'type': self.type, 
            'customer_id' : customer_id, 
            'unix_datetime': unix_datetime,
            'ad_data' : ref,
            'ad_type' : None,
            'ad_title' : None,
            'post_id': None, 
            'post_link': None,
            'label' : None,
            'action' : None
            }
           
        elif self.type == 'post':
            data = req['entry'][0]['changes'][0]['value']
            from_id = data['from']['id']
            link = data['post']['permalink_url']
            post_id = data['post_id']
            unix_datetime = data['created_time']
            verb = data['verb']
            
            row = {'page_id': page_id,
            'type': self.type, 
            'customer_id' : from_id, 
            'unix_datetime': unix_datetime,
            'ad_data' : None,
            'ad_type' : None,
            'ad_title' : None,
            'post_id': post_id, 
            'post_link': link,
            'label' : None,
            'action' : verb
            }
        return [row]

My error mostly occured with the type 'website' and 'post. For example

KeyError: "post [while running 'extract req: post-ptransform-217']"

KeyError: "referral [while running 'extract req: website-ptransform-228']"

I used to manually test with some requests of each type and it works fine and I think the key really exists for each type. Moreover, I've checked that sometimes the first step has no website-typed output yet still error from step extract req: website occured. I have tried to check with if isinstance(data, str): If not use json.loads(data) but it doesn't work.


Solution

  • After I struggled for many days, I got a solution to debug my code by writing a request string as one of a column in Bigquery. As there are too many cases of unwanted webhook request, I missed some rules in classification, so they passed the classification without the key I wants to extract.