Search code examples
pythonprefect

Prefect Task Scheduling


I am new to Prefect, having worked mostly w/ Airflow. I have put together a workflow, that executes fine, but the tasks dont execute in the order I expect. Flow here:

with Flow(name='4chan_extract') as flow:
    board_param = Parameter(name='board_name', required = True, default='pol')
    getData(board= board_param)
    checkDB(url= 'postgresql://postgres:user@localhost:5434/postgres')
    upload_raw(url="postgresql://postgres:user@localhost:5434/postgres", 
    board=board_param)
    remove_dupes(board=board_param)

However, when I use flow.visualise() this flow, the DAG looks really odd.

My understanding is that the context operator with sets order? using up_stream in each task didn't help.

Any help is appreciated.


Solution

  • If you want your tasks to be called sequentially, one after the other, you can add upstream_tasks to each of your tasks. Additionally, to easily pass state dependencies, you can assign a name to a task when calling it (data = get_data(board=board_param)), this allows passing this named reference to downstream dependencies.

    I can only guess how you want this flow to look like, but assuming you want it to run sequentially, here is a full example and a DAG visualization:

    from prefect import task, Flow, Parameter
    
    
    @task
    def get_data(board):
        pass
    
    
    @task
    def check_db(url):
        pass
    
    
    @task
    def upload_raw(url, board):
        pass
    
    
    @task
    def remove_duplicates(board):
        pass
    
    
    with Flow(name="4chan_extract") as flow:
        board_param = Parameter(name="board_name", required=True, default="pol")
        data = get_data(board=board_param)
        check = check_db(
            url="postgresql://postgres:user@localhost:5434/postgres", upstream_tasks=[data]
        )
        upload = upload_raw(
            url="postgresql://postgres:user@localhost:5434/postgres",
            board=board_param,
            upstream_tasks=[check],
        )
        remove_duplicates(board=board_param, upstream_tasks=[upload])
    
    if __name__ == "__main__":
        flow.visualize()
    

    enter image description here