Search code examples
pythonjsonserializationjupyterluigi

JSON serialization error when creating a Luigi task graph


I'm trying to batch up the processing of a few Jupyter notebooks using Luigi, and I've run into a problem.

I have two classes. The first, transform.py:

import nbformat
import nbconvert

import luigi
from nbconvert.preprocessors.execute import CellExecutionError


class Transform(luigi.Task):
    """Foo."""
    notebook = luigi.Parameter()
    requirements = luigi.ListParameter()

    def requires(self):
        return self.requirements

    def run(self):
        nb = nbformat.read(self.notebook, nbformat.current_nbformat)
        # https://nbconvert.readthedocs.io/en/latest/execute_api.html
        ep = nbconvert.preprocessors.ExecutePreprocessor(timeout=600, kernel_name='python3')
        try:
            ep.preprocess(nb, {'metadata': {'path': "/".join(self.notebook.split("/")[:-1])}})
            with self.output().open('w') as f:
                nbformat.write(nb, f)
        except CellExecutionError:
            pass  # TODO

    def output(self):
        return luigi.LocalTarget(self.notebook)

This defines a Luigi task that takes a notebook as input (along with possible prior requirements to running this task) and ought to run that notebook and report a success or failure as output.

To run Transform tasks I have a tiny Runner class:

import luigi


class Runner(luigi.Task):
    requirements = luigi.ListParameter()

    def requires(self):
        return self.requirements

To run my little job, I do:

from transform Transform
trans = Transform("../tests/fixtures/empty_valid_errorless_notebook.ipynb", []) 
from runner import Runner
run_things = Runner([trans])

But this raises TypeError: Object of type 'Transform' is not JSON serializable!

Is my luigi task format correct? If so, is it obvious what component in run is making the entire class unserializable? If not, how should I go about debugging this?


Solution

  • requires() is supposed to return a task or tasks, not a parameter.

    e.g.,

    class Runner(luigi.Task):
      notebooks = luigi.ListParameter()
    
      def requires(self):
        required_tasks = []  
        for notebook in self.notebooks:
          required_tasks.append(Transform(notebook))
        return required_tasks
    
    class Transform(luigi.Task):
       notebook = luigi.Parameter()
    
       def requires(self):
          return []
    
    # then to run at cmd line
    luigi --module YourModule Runner --noteboooks '["notebook1.pynb","notebook2.pynb"]'