We are considering using airflow to replace our currently custom rq based workflow but I am unsure of the best way to design it. Or if it even makes sense to use airflow. The use case is:
e.g.
After a data upload we put an item on a queue:
upload:
user: 'a'
data:
- type: datatype1
start: 1
end: 3
- type: datatype2
start: 2
end: 3
And this would trigger:
and then maybe job1 would have some clean up job that runs after it. (Also it would be good if it was possible to be able to restrict jobs to only run if there are not other jobs running for the same user.)
Approaches I have considered:
1.
Trigger a DAG when data upload arrives on message queue.
Then this DAG determines which dependent jobs to to run and passes as arguments (or xcom) the user and the time range.
2.
Trigger a DAG when data upload arrives on message queue.
Then this DAG dynamically creates DAGS for the jobs based on datatypes and templates in the user and timeframe.
So you get dynamic DAGs for each user, job, timerange combo.
I'm not even sure how to trigger DAGs from a message queue... And finding it hard to find examples similar to this use case. Maybe that is because Airflow is not suited?
Any help/thoughts/advice would be greatly appreciated.
Thanks.
Airflow is built around time based schedules. It is not built to trigger runs based on the landing of data. There are other systems designed to do this instead. I heard something like pachyderm.io or maybe dvs.org. Even repurposing a CI tool or customizing a Jenkins setup could trigger based on file change events or a message queue.
However you can try to work it with Airflow by having an external queue listener use rest API calls to Airflow to trigger DAGs. EG if the queue is an AWS SNS queue you could have an AWS Lambda listener in simple Python do this.
I would recommend one DAG per job type (or is it user, whichever is less) which the trigger logic determines is correct based on the queue. If there's common clean up tasks and the like, the DAG might use a TriggerDagRunOperator to start those, or you might just have a common library of those clean up tasks that each DAG includes. I think the latter is cleaner.
DAGs can have their tasks limited to certain pools. You could make a pool per user so as to limit the runs of jobs per user. Alternatively if you have a DAG per user, you could set your max concurrent DAG runs for that DAG to something reasonable.