From my read of the docs, luigi is designed to work with text files or raw binaries as Targets. I am trying to build a luigi workflow for an existing processing pipeline that uses HDF5 files (for their many advantages) using h5py on a regular file system. Some tasks in this workflow do not create a whole new file, but rather add new datasets to an existing HDF file. Using h5py I would read a dataset with:
hdf = h5py.File('filepath','r')
hdf['internal/path/to/dataset'][...]
write a dataset with:
hdf['internal/path/to/dataset'] = np.array(data)
and test if a dataset in the HDF file exists with this line:
'internal/path/to/dataset' in hdf
My question is, is there a way to adapt luigi to work with these types of files?
My read of luigi docs makes me think I may be able to either subclass luigi.format.Format
or perhaps subclass LocalTarget
and make a custom 'open' method. But I can't find any examples on how to implement this. Many thanks to any suggestions!
d6tflow has a HDF5 pandas implementation and can easily be extended to save data other than pandas dataframes.
import d6tflow
from d6tflow.tasks.h5 import TaskH5Pandas
import pandas as pd
class Task1(TaskH5Pandas):
def run(self):
df = pd.DataFrame({'a':range(10)})
self.save(df)
class Task2(d6tflow.tasks.TaskCachePandas):
def requires(self):
return Task1()
def run(self):
df = self.input().load()
# use dataframe from HDF5
d6tflow.run([Task2])
To see https://d6tflow.readthedocs.io/en/latest/targets.html#writing-your-own-targets on how to extend.