I am trying to read a csv file in my local drive using the Luigi package, specifically luigi.Parameter() as fileName and then read that into a pandas dataframe using pd.read_csv and carry out some data wrangling.
This is the code I have written for this task:
import luigi
import pandas as pd
class read_blog(luigi.Task):
fileName = luigi.Parameter()
def run(self):
full_file = pd.read_csv(fileName)
read_blog = full_file[full_file['properties__url'].string.contain$
regex=False)]
blog_readers = read_blog[['anonymous_id','channel',
'context__campaign__content','context__campaign__medium',
'context__campaign__name','context__campaign_source',
'context__campaign__term','timestamp','user_id',
'context__page__url','properties__url',
'properties__search','context__page__title',
'properties__path','context__user_agent',
'properties__referrer','rank']]
blog_readers.to_csv('blog_readers.csv')
if __name__ == '__main__':
luigi.run()
and then run this on terminal using this:
python cleanup.py read_blog --local-scheduler --fileName '/Users/emmanuels/Desktop/attribute.csv'
This should according to my understanding run the read_blog class in cleanup.py and give the fileName variable a parameter that is a link to my csv file.
My code should then read the csv as pandas dataframe, however this is not happening and this is the full error message I am receiving:
===== Luigi Execution Summary =====
/Users/emmanuels/anaconda3/lib/python3.7/site-packages/luigi/configuration.py:54:UserWarning: LUIGI_CONFIG_PATH points to a file which does not exist. Invalid file: /Users/emmanuels/luigi_tutorial/luigi/luigi.conf
warnings.warn("LUIGI_CONFIG_PATH points to a file which does not exist. Invalidfile: {path}".format(path=config_file))
DEBUG: Checking if read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv) is complete
/Users/emmanuels/anaconda3/lib/python3.7/site-packages/luigi/worker.py:328: UserWarning: Task read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv) without outputs has no custom complete() method
is_complete = task.complete()
INFO: Informed scheduler that task read_blog__Users_emmanuels_23aa7e1a57 has status PENDING
INFO: Done scheduling tasks
INFO: Running Worker with 1 processes
DEBUG: Asking scheduler for work...
DEBUG: Pending tasks: 1
INFO: [pid 94938] Worker Worker(salt=156803262, workers=1, host=Emmanuels-MacBook-Pro.local, username=emmanuels, pid=94938) running read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv)
ERROR: [pid 94938] Worker Worker(salt=156803262, workers=1, host=Emmanuels-MacBook-Pro.local, username=emmanuels, pid=94938) failed read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv)
Traceback (most recent call last):
File "/Users/emmanuels/anaconda3/lib/python3.7/site-packages/luigi/worker.py", line 191, in run
new_deps = self._run_get_new_deps()
File "/Users/emmanuels/anaconda3/lib/python3.7/site-packages/luigi/worker.py", line 129, in _run_get_new_deps
task_gen = self.task.run()
File "cleanup.py", line 8, in run
full_file = pd.read_csv(fileName)
NameError: name 'fileName' is not defined
DEBUG: 1 running tasks, waiting for next task to finish
INFO: Informed scheduler that task read_blog__Users_emmanuels_23aa7e1a57 has status FAILED
DEBUG: Asking scheduler for work...
DEBUG: Done
DEBUG: There are no more tasks to run at this time
DEBUG: There are 1 pending tasks possibly being run by other workers
DEBUG: There are 1 pending tasks unique to this worker
DEBUG: There are 1 pending tasks last scheduled by this worker
INFO: Worker Worker(salt=156803262, workers=1, host=Emmanuels-MacBook-Pro.local, username=emmanuels, pid=94938) was stopped. Shutting down Keep-Alive thread
INFO:
===== Luigi Execution Summary =====
Scheduled 1 tasks of which:
* 1 failed:
- 1 read_blog(fileName=/Users/emmanuels/Desktop/attributiondata.csv)
This progress looks :( because there were failed tasks
fileName
is an attribute of class read_blog
, thus access fileName
via self
!
full_file = pd.read_csv(self.fileName)