I am trying to create a pipeline that takes in 3 files, takes n amount of rows from each file (represented by obs_num) compares each of the values in the files to a random float between 0 and 1 and either returns the obs_num if it is greater than the random number or false if not. I then append these values to a list (list 1)
I then look at the next second file, check the position of the obs_num, if the num in the same position returned false in the previous file return false, or else check if the num is greater than the random float again, I then do the same for the third file. I also append these values to lists (list 2 and 3)
I then convert these 3 lists to a dataframe with each list representing a column.
The problem however is that when I run my pipeline the output is a file with one blank column as opposed to a csv with rows equivalent to obs_num.
Here is the code I am using for my wrapper:
import pandas as pd
import luigi
import state_to_state_machine as ssm
class wrapper(luigi.WrapperTask):
def requires(self):
file_tag = ['Sessiontolead','leadtoopportunity','opportunitytocomplete']
size = 10
for j in range(1,int(size)):
return[ssm.state_machine(file_tag=i,size=size,obs_nums=j)for i in file_tag]
def run(self):
print('The wrapper is complete')
pd.DataFrame().to_csv('/Users/emm/Documents/AttributionData/Data/datawranglerwrapper3.csv') #never returns anything
def output(self):
return luigi.LocalTarget('/Users/emm/Documents/AttributionData/Data/datawranglerwrapper3.csv')
if __name__ == '__main__':
luigi.build([wrapper()],workers=8,local_scheduler=True)
state machine:
import pandas as pd
import get_samples as gs
import luigi
import random
class state_machine(luigi.Task):
file_tag = luigi.Parameter()
obs_nums = luigi.Parameter() #directly get element - don't write to file
size = luigi.Parameter()
def run(self):
path = '/Users/emm/Documents/AttributionData/Data/Probabilities/'
file = path+self.file_tag+'sampleprobs.csv'
def generic_state_machine(tag,file=file,obs_nums=self.obs_nums):
if file.split('/')[7][:4] == tag:
state_machine = pd.read_csv(file)
return state_machine.ix[:,1][obs_nums] if s.ix[:,1][obs_nums] > random.uniform(0,1) else False
session_to_leads = []
lead_to_opps = []
opps_to_comp = []
session_to_leads.append(generic_state_machine(tag='Sessiontoload+sampleprobabs',file=file,obs_nums=self.obs_nums))
lead_to_opps.append(generic_state_machine(tag='leadtoopportunity+sampleprobabs',file=file,obs_nums=self.obs_nums)) if session_to_leads[self.obs_nums-1] != False else lead_to_opps.append(False)
opps_to_comp.append(generic_state_machine(tag='opportunitytocomplete+sampleprobabs',file=file,obs_nums=self.obs_nums)) if lead_to_opps[self.obs_nums-1] != False else opps_to_comps.append(False)
df = pd.DataFrame(zip(session_to_leads,lead_to_opps,opps_to_comp),columns=['session_to_leads','lead_to_opps','oops_to_comp'])
with self.output().open('w') as out_csv:
out_csv.write(df.to_csv('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+str(self.size)+'statemachine.csv'))
def output(self):
return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+str(self.size)+'statemachine.csv')
I have asked similar versions of this question, but then has changed each time, I have managed to resolve most of the initial issues- so this is not a repetition on previous questions
So from my understanding in this instance this should produce 3 state machine files each with 10 rows for each observation and the comparison made.
The three files are literally files with 2 columns, the first being the index and the second being probabilities between 0 and 1
I'm not sure if this a problem with the logic of my code or how I am using Luigi
Check your formatting. In your state machine file, your with
statement is at the class level for some reason and the output
method is at the namespace level.