I have a workflow that I'll describe as follows:
[ Dump(query) ] ---+
|
+---> [ Parquet(dump, schema) ] ---> [ Hive(parquet) ]
|
[ Schema(query) ] ---+
Where:
query
is a query to an RDBMSDump
dumps the result query
to a CSV file dump
Schema
runs the query
and xcoms its schema schema
Parquet
reads csv
and uses schema
to create a Parquet file parquet
Hive
creates a Hive table based on the Parquet file parquet
The reason behind this somehow convoluted workflow are due to constraints that cannot be solved and lie outside of the scope of the question (but yeah, it would ideally be much simpler than this).
My question is about rolling back the effects of a pipeline in case of failure.
These are the rollbacks that I would like to see happen in different conditions:
dump
should always be deleted, regardless the end result of the pipelineparquet
should be deleted if, for whatever reason, the Hive table creation failsRepresenting this in a workflow, I'd probably put it down like this:
[ Dump(query) ] ---+
|
+---> [ Parquet(dump, schema) ] ---> [ Hive(parquet) ]
| | |
[ Schema(query) ] ---+ | |
v v
[ DeleteParquetOutput ] --> [ DeleteDumpOutput ]
Where the transition from Parquet
to DeleteParquetOutput
is performed only if an error occurs and the transitions going into DeleteDumpOutput
occur ignoring any failure from its dependencies.
This should solve it, but I believe that more complex pipelines could suffer greatly in increased complexity by this error handling logic.
Before moving on to more details, my question: could this be considered a good practice when it comes to dealing with errors in an Airflow pipeline? What could be a different (and possibly more sustainable) approach?
If you are further interested in how I would like to solve this, read on, otherwise feel free to answer and/or comment.
Ideally, what I'd like to do would be:
Let's make a couple of examples with the given pipeline.
We reverse the DAG and fill each task with its mandatory rollback procedure (if any), getting this
+---> [ Dump: UNDO ]
|
[ Hive: None ] ---> [ Parquet: None ] ---+
^ |
| +---> [ Schema: None ]
+--- Start here
Hive
+---> [ Dump: UNDO ]
|
[ Hive: None ] ---> [ Parquet: UNDO (error) ] ---+
^ |
| +---> [ Schema: None ]
+--- Start here
Is there any way to represent something like this in Airflow? I would also be open to evaluating different workflow automation solutions, should they enable this kind of approach.
Seems like a complicated way to handle errors. I think it's better to think of errors as simply stopping the current run of a DAG so that you can fix any issues and re-start it from where it left off. Sure you can clean up partially created files that were created by a particular task but I wouldn't wind back the entire pipeline just because of some downstream issue.
Take for example what we do where I work, admittedly it's using different technologies but the same kind of workflow I think:
With our current setup - if someone accidentally changes the structure of the Snowflake table that we load S3 files into the only task that will fail is the last one (step 3) since the table structure no longer matches the CSV structure. To fix this we simply need to revert the structure of the table back to what it was and re-run the task that failed. Airflow would then re-copy the file from S3 into Snowflake and succeed.
With the setup that you propose what would happen? If the last task fails it would roll-back the entire pipeline and remove the CSV file from the s3 bucket; we would have to download the file from the source database again. It would be better if we simply re-ran the task to copy from s3 into Snowflake saving the hassle of having to run the entire DAG.