I'm writing a data pipeline to extract 4 types of json which comes with different attribute structure and write it to Bigquery.
My pipeline :
data = (pipeline
| "Read from Pub/Sub" >> io.ReadFromPubSub(subscription=known_args.input_subscription)
)
# first branch - push message to Datastore
p1 = (data
| "Add publish time" >> ParDo(AddTimestamp())
| "Create Entity" >> Map(EntityWrapper(known_args.kind).make_entity)
| "Write to Datastore" >> WriteToDatastore(DATASTORE_PROJECT)
)
# second branch - write data to Bigquery
p2 = (data
| "classify request depending on type" >> ParDo(ClassifyReq()).with_outputs('label', 'ads', 'website', 'post')
)
for type in ['label', 'ads', 'website', 'post'] :
result = (p2[type]
| "extract req: " + type >> ParDo(ExtractReq(type)) # step label name must be unique
| "Write to Bigquery: " + type >> io.WriteToBigQuery(
known_args.output_table,
schema=known_args.output_schema,
write_disposition=io.BigQueryDisposition.WRITE_APPEND,
)
)
My function :
class ClassifyReq(DoFn):
"""
A transform that classify type of Pub/Sub messages and outputs element with tag
"""
def process(self,element):
type = ''
req_str = str(element)
req = json.loads(element.decode('utf-8'))
if 'inbox_labels' in req_str:
type = 'label'
elif 'ad_id' in req_str:
type = 'ads'
elif ('postback' in req_str or 'referral' in req_str) and 'ref' in req_str:
type = 'website'
elif ('feed' in req_str) and ('comment' in req_str): # only comment , no reaction_type
data = req['entry'][0]['changes'][0]['value']
from_id = data['from']['id']
if from_id not in page_data.keys(): # only update not from page admin
type = 'post'
yield TaggedOutput(type, element)
class ExtractReq(DoFn):
"""
A Transform that extract element and map data for Bigquery table depending on
given tag from the prior step.
"""
def __init__(self, type):
self.type = type
def process(self, element):
req = json.loads(element.decode("utf-8"))
page_id = req['entry'][0]['id']
if self.type == 'label':
unix_datetime = req['entry'][0]['time']
data = req['entry'][0]['changes'][0]['value']
customer_id = data['user']['id']
label = data['label']['page_label_name']
label_action = data['action']
row = {'page_id': page_id,
'type': self.type,
'customer_id' : customer_id,
'unix_datetime': unix_datetime,
'ad_data' : None,
'ad_type' : None,
'ad_title' : None,
'post_id': None,
'post_link': None,
'label' : label,
'action' : label_action
}
elif self.type == 'ads':
data = req['entry'][0]['messaging'][0]
customer_id = data['sender']['id']
unix_datetime = int(str(req['entry'][0]['time'])[:-3])
if isinstance(data, str):
data = json.loads(str)
ad_id = data['referral']['ad_id']
ad_title = data['referral']['ads_context_data']['ad_title']
ad_type = data['referral']['type']
post_id = data['referral']['ads_context_data']['post_id']
row = {'page_id': page_id,
'type': self.type,
'customer_id' : customer_id,
'unix_datetime': unix_datetime,
'ad_data' : ad_id,
'ad_type' : ad_type,
'ad_title' : ad_title,
'post_id': post_id,
'post_link': None,
'label' : None,
'action' : None
}
elif self.type == 'website':
data = req['entry'][0]['messaging'][0]
# timestamp has 13 digits so cut out milli second unit
unix_datetime = int(str(req['entry'][0]['time'])[:-3])
if isinstance(data, str):
data = json.loads(str)
customer_id = data['sender']['id']
ref = data['referral']['ref']
row = {'page_id': page_id,
'type': self.type,
'customer_id' : customer_id,
'unix_datetime': unix_datetime,
'ad_data' : ref,
'ad_type' : None,
'ad_title' : None,
'post_id': None,
'post_link': None,
'label' : None,
'action' : None
}
elif self.type == 'post':
data = req['entry'][0]['changes'][0]['value']
from_id = data['from']['id']
link = data['post']['permalink_url']
post_id = data['post_id']
unix_datetime = data['created_time']
verb = data['verb']
row = {'page_id': page_id,
'type': self.type,
'customer_id' : from_id,
'unix_datetime': unix_datetime,
'ad_data' : None,
'ad_type' : None,
'ad_title' : None,
'post_id': post_id,
'post_link': link,
'label' : None,
'action' : verb
}
return [row]
My error mostly occured with the type 'website' and 'post. For example
KeyError: "post [while running 'extract req: post-ptransform-217']"
KeyError: "referral [while running 'extract req: website-ptransform-228']"
I used to manually test with some requests of each type and it works fine and I think the key really exists for each type. Moreover, I've checked that sometimes the first step has no website-typed output yet still error from step extract req: website occured. I have tried to check with if isinstance(data, str):
If not use json.loads(data)
but it doesn't work.
After I struggled for many days, I got a solution to debug my code by writing a request string as one of a column in Bigquery. As there are too many cases of unwanted webhook request, I missed some rules in classification, so they passed the classification without the key I wants to extract.