Search code examples
pythonmysqlluigi

MySQL Targets in Luigi workflow


My TaskB requires TaskA, and on completion TaskA writes to a MySQL table, and then TaskB is to take in this output to the table as its input.

I cannot seem to figure out how to do this in Luigi. Can someone point me to an example or give me a quick example here?


Solution

  • The existing MySqlTarget in luigi uses a separate marker table to indicate when the task is complete. Here's the rough approach I would take...but your question is very abstract, so it is likely to be more complicated in reality.

    import luigi
    from datetime import datetime
    from luigi.contrib.mysqldb import MySqlTarget
    
    
    class TaskA(luigi.Task):
        rundate = luigi.DateParameter(default=datetime.now().date())
        target_table = "table_to_update"
        host = "localhost:3306"
        db = "db_to_use"
        user = "user_to_use"
        pw = "pw_to_use"
    
        def get_target(self):
            return MySqlTarget(host=self.host, database=self.db, user=self.user, password=self.pw, table=self.target_table,
                               update_id=str(self.rundate))
    
        def requires(self):
            return []
    
        def output(self):
            return self.get_target()
    
        def run(self):
            #update table
            self.get_target().touch()
    
    
    class TaskB(luigi.Task):
        def requires(self):
            return [TaskA()]
    
        def run(self):
            # reading from target_table