I have a routine task to upload and share a dump on S3 bucket. While the code below works, for some reason it does not want to overwrite file.
From the docs, I need to
1) define solution for two parallel executions:
path = luigi.Parameter(default=glob(DATA_DIR)[-2], batch_method=max)
2) add resources = {'overwrite_resource': 1}
While it works for the local files - it does not for the S3.
class report_to_S3(luigi.Task):
client = S3Client()
path = luigi.Parameter(default=glob(DATA_DIR)[-2], batch_method=max)
local_dump_path = '../../../data/local_db_dump.csv'
resources = {'overwrite_resource': 1}
def requires(self):
return upload_tweets(path=self.path)
def output(self):
self.s3_path = "s3://qclm-nyc-ct/dump/dump.csv"
return S3Target(self.s3_path, client=self.client)
def run(self):
c = sqa.create_engine('postgresql:///qc_soc_media')
df = pd.read_sql_query('SELECT id, user_id, timestamp, lat, lon, ct FROM tweets WHERE ct IS NOT NULL', c)
N = len(df)
df.to_csv(self.local_dump_path, index=None)
self.client.put(self.local_dump_path, self.output().path,
headers={'Content-Type': 'application/csv'})
send_S3_report(N)
if __name__ == '__main__':
luigi.run(local_scheduler=True, main_task_cls=report_to_S3)
If the target specified in the output() method already exists, the run() method will not execute. You may want to work the timestamp into the filename, or create another sentinel/flag that indicates the work is done.