Have asked a similar question, but decided to breakdown my pipeline into fewer steps to get a deeper understanding of where I am going wrong and to make debugging as easy as possible.
With my first class I am taking a huge csv and separating it to multiple csvs based on the user's current state. I then created another task that then looks at whether or not a given user moved from one state to the next returning 1's and 0's depending on whether or not this happened.
I then have a wrapper class that should dynamically assign parameter values to the previous class. However, my pipeline does not seem to be running and I'm not sure what I am doing wrong
Here is what I have:
separate_csv.py:
import luigi
import pandas as pd
class data_filter(luigi.Task):
file = luigi.Parameter()
def run(self):
for current in actions:
file_pd = pd.read_csv(self.file)
actions = file_pd.state.unique()
filter_file = file_pd.loc[file_pd.state.str.contains(current,na=False)]
filter_file.to_csv('/Users/emm/Documents/AttributionData/Data/'+str(current)+'.csv')
def requires(self):
return []
def output(self):
return luigi.LocalTarget('/Users/emm/Documents/AttributionData/Data/complete.csv')
state_to_state_transitions.py:
import luigi
import pandas as pd
import separate_csv as sep
class state_to_state(luigi.Task):
first_file = luigi.Parameter()
second_file = luigi.Parameter()
def run(self):
#iterate through states and find probability of anonymous id existing in next state
path = '/Users/emm/Documents/AttributionData/Data/Probabilities/'
first = pd.read_csv(path+self.first_file)
second = pd.read_csv(path+self.second_file)
first['probability'] = first.anonymous_id.isin(second.anonymous_id).astype(int)
#save anonymous id along with probability (1,0) of whether or not it exists in the next state
with self.output().open('w') as out_csv:
out_csv.write(first[['anonymous_id','probability']].to_csv('/Users/emm/Documents/AttributionData/Data/Probabilities/'+str(self.first_file[:-4]+'to'+self.second_file)))
def requires(self):
files_two = [sep.data_filter(file='/Users/emm/Desktop/Attribution/finalcleanattributiondata.csv')]
return files_two
def output(self):
return luigi.LocalTarget('/Users/emm/Documents/AttributionData/Data/Probabilities/'+str(self.first_file[:-4]+'to'+self.second_file))
wrapper.py
import state_to_state_transitions2 as sst
import pandas as pd
import luigi
class wrapper(luigi.WrapperTask):
def requires(self):
files = ['Session.csv', 'lead.csv', 'opportunity.csv', 'complete.csv']
task_list = []
for i in range(1, len(files)):
task_list.append(sst.state_to_state(first_file=files[i-1],second_file=files[i]))
return task_list
def run(self):
print('Wrapper ran')
pd.DataFrame().to_csv('/Users/emmanuels/Documents/AttributionData/Data/wrangler1.csv')
def output(self):
return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/wrangler1.csv')
if __name__ == '__main__':
luigi.build([wrapper()],workers=8,local_scheduler=True)
Here are parts of my error message:
File "/Users/emm/Documents/GitHub/AttributionModel/Capstone/state_to_state_transitions2.py", line 15, in run
out_csv.write(first[['anonymous_id','probability']].to_csv('/Users/emm/Documents/AttributionData/Data/Probabilities/'+str(self.first_file[:-4]+'to'+self.second_file)))
TypeError: write() argument must be str, not None
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task state_to_state_lead_csv_opportunity_csv_b31ac9d110 has status FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 4 pending tasks possibly being run by other workers
DEBUG: There are 4 pending tasks unique to this worker
DEBUG: There are 4 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=152474850, workers=1, host=Emms-MacBook-Pro.local, username=***, pid=***) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 5 tasks of which:
* 1 present dependencies were encountered:
- 1 data_filter(file=/Users/emm/Desktop/Attribution/finalcleanattributiondata.csv)
* 3 failed:
- 3 state_to_state(first_file=Session.csv, second_file=lead.csv) ...
* 1 were left pending, among these:
* 1 had failed dependencies:
- 1 wrapper()
This progress looks :( because there were failed tasks
Your problem is here:
with self.output().open('w') as out_csv:
out_csv.write(first[['anonymous_id','probability']].to_csv('/Users/emm/Documents/AttributionData/Data/Probabilities/'+str(self.first_file[:-4]+'to'+self.second_file)))
Instead of having first[['anonymous_id','probability']].to_csv(file_name)
which writes to file file_name
, you need to have first[['anonymous_id','probability']].to_csv()
, which returns a string with the csv data.
So overall, you should have:
with self.output().open('w') as out_csv:
out_csv.write(first[['anonymous_id','probability']].to_csv())