Search code examples
pythonpandasluigi

Problems producing desired luigi output


I am trying to create a pipeline that takes in 3 files, takes n amount of rows from each file (represented by obs_num) compares each of the values in the files to a random float between 0 and 1 and either returns the obs_num if it is greater than the random number or false if not. I then append these values to a list (list 1)

I then look at the next second file, check the position of the obs_num, if the num in the same position returned false in the previous file return false, or else check if the num is greater than the random float again, I then do the same for the third file. I also append these values to lists (list 2 and 3)

I then convert these 3 lists to a dataframe with each list representing a column.

The problem however is that when I run my pipeline the output is a file with one blank column as opposed to a csv with rows equivalent to obs_num.

Here is the code I am using for my wrapper:

import pandas as pd
import luigi
import state_to_state_machine as ssm
class wrapper(luigi.WrapperTask):
    def requires(self):
        file_tag = ['Sessiontolead','leadtoopportunity','opportunitytocomplete']
        size = 10
        for j in range(1,int(size)):
            return[ssm.state_machine(file_tag=i,size=size,obs_nums=j)for i in file_tag]
    def run(self):
        print('The wrapper is complete')
        pd.DataFrame().to_csv('/Users/emm/Documents/AttributionData/Data/datawranglerwrapper3.csv') #never returns anything
    def output(self):
        return luigi.LocalTarget('/Users/emm/Documents/AttributionData/Data/datawranglerwrapper3.csv')
if __name__ == '__main__':
    luigi.build([wrapper()],workers=8,local_scheduler=True)

state machine:

import pandas as pd
import get_samples as gs
import luigi
import random
class state_machine(luigi.Task):
    file_tag = luigi.Parameter()
    obs_nums = luigi.Parameter() #directly get element - don't write to file
    size = luigi.Parameter()
    def run(self):
        path = '/Users/emm/Documents/AttributionData/Data/Probabilities/'
        file = path+self.file_tag+'sampleprobs.csv'
        def generic_state_machine(tag,file=file,obs_nums=self.obs_nums):
            if file.split('/')[7][:4] == tag:
                state_machine = pd.read_csv(file)
                return state_machine.ix[:,1][obs_nums] if s.ix[:,1][obs_nums] > random.uniform(0,1) else False
        session_to_leads = []
        lead_to_opps = []
        opps_to_comp = []
        session_to_leads.append(generic_state_machine(tag='Sessiontoload+sampleprobabs',file=file,obs_nums=self.obs_nums))
        lead_to_opps.append(generic_state_machine(tag='leadtoopportunity+sampleprobabs',file=file,obs_nums=self.obs_nums)) if session_to_leads[self.obs_nums-1] != False else lead_to_opps.append(False)
        opps_to_comp.append(generic_state_machine(tag='opportunitytocomplete+sampleprobabs',file=file,obs_nums=self.obs_nums)) if lead_to_opps[self.obs_nums-1] != False else opps_to_comps.append(False)
        df = pd.DataFrame(zip(session_to_leads,lead_to_opps,opps_to_comp),columns=['session_to_leads','lead_to_opps','oops_to_comp'])
    with self.output().open('w') as out_csv:
        out_csv.write(df.to_csv('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+str(self.size)+'statemachine.csv'))
def output(self):
    return luigi.LocalTarget('/Users/emmanuels/Documents/AttributionData/Data/Probabilities/'+str(self.file_tag)+str(self.size)+'statemachine.csv')

I have asked similar versions of this question, but then has changed each time, I have managed to resolve most of the initial issues- so this is not a repetition on previous questions

So from my understanding in this instance this should produce 3 state machine files each with 10 rows for each observation and the comparison made.

The three files are literally files with 2 columns, the first being the index and the second being probabilities between 0 and 1

I'm not sure if this a problem with the logic of my code or how I am using Luigi


Solution

  • Check your formatting. In your state machine file, your with statement is at the class level for some reason and the output method is at the namespace level.