I am new to Prefect, having worked mostly w/ Airflow. I have put together a workflow, that executes fine, but the tasks dont execute in the order I expect. Flow here:
with Flow(name='4chan_extract') as flow:
board_param = Parameter(name='board_name', required = True, default='pol')
getData(board= board_param)
checkDB(url= 'postgresql://postgres:user@localhost:5434/postgres')
upload_raw(url="postgresql://postgres:user@localhost:5434/postgres",
board=board_param)
remove_dupes(board=board_param)
However, when I use flow.visualise()
this flow, the DAG looks really odd.
My understanding is that the context operator with
sets order? using up_stream
in each task didn't help.
Any help is appreciated.
If you want your tasks to be called sequentially, one after the other, you can add upstream_tasks
to each of your tasks. Additionally, to easily pass state dependencies, you can assign a name to a task when calling it (data = get_data(board=board_param)
), this allows passing this named reference to downstream dependencies.
I can only guess how you want this flow to look like, but assuming you want it to run sequentially, here is a full example and a DAG visualization:
from prefect import task, Flow, Parameter
@task
def get_data(board):
pass
@task
def check_db(url):
pass
@task
def upload_raw(url, board):
pass
@task
def remove_duplicates(board):
pass
with Flow(name="4chan_extract") as flow:
board_param = Parameter(name="board_name", required=True, default="pol")
data = get_data(board=board_param)
check = check_db(
url="postgresql://postgres:user@localhost:5434/postgres", upstream_tasks=[data]
)
upload = upload_raw(
url="postgresql://postgres:user@localhost:5434/postgres",
board=board_param,
upstream_tasks=[check],
)
remove_duplicates(board=board_param, upstream_tasks=[upload])
if __name__ == "__main__":
flow.visualize()