Search code examples
pythonapache-sparkgarbage-collectiondaskdask-distributed

Dask Iteration in sequential algorithm is longer than the previous one


I am currently implementing a variation of the Dijkstra algorithm in Dask and Spark (form comparison purposes) for airline departures, which involves a sequential computation over the nodes of the graph. Additionaly, on every step I filter out some records in the graph (nodes) as they become unfeasable due to their departure time. However, new iterations take longer than the previous ones although the dataframes become smaller. I solved this problem in Spark by writing intermmediate results to parquet, but I cannot solve it for Dask.

I suspect that the dataframe is executing again every step in the graph, however I have been unable to prevent this from happening.

So far I have tried the following approaches:

  1. Use persist (This one has been the fastest). However the number of tasks to complete increases in the UI with every iteration. For example: Iteration 8 shows x/800 and iteration 9 shows x/900 (I am using 100 partitions).
while i < n_nodes:
    i += 1
    # some computations over df
    df = client.persist(df)
    # add new records from df to explored
    # some computations over explored
    explored = client.persist(explored)
  1. Write the current df to disk and read it immediately after (This worked marvellously in spark but was not so effective in Dask because it appends the data and fails if directory is deleted). I this case I used both (not simultaneously), del df and client.cancel(df) with little effect on computation time so I decided to comment them out.
while i < n_nodes:
    i += 1
    # some computations over df
    os.system('rm -r temp_dir/df_vuelos_dask')
    df.to_parquet('temp_dir/df_vuelos_dask')
    # del df
    # client.cancel(df)
    df = dd.read_parquet('temp_dir/df_vuelos_dask')
    # add new records from df to explored
    # some computations over explored
    os.system('rm -r temp_dir/explored')
    explored.to_parquet('temp_dir/explored')
    # del explored
    # client.cancel(explored)
    dd.read_parquet('temp_dir/explored')

  1. Use client.restart(). This one was not good because it deleted the contents of df and explored which became problematic.
while i < n_nodes:
    i += 1
    # some computations over df
    os.system('rm -r temp_dir/df_vuelos_dask')
    df.to_parquet('temp_dir/df_vuelos_dask')
    client.restart()
    df = dd.read_parquet('temp_dir/df_vuelos_dask')
    # add new records from df to explored
    # some computations over explored
    os.system('rm -r temp_dir/explored')
    explored.to_parquet('temp_dir/explored')
    client.restart()
    dd.read_parquet('temp_dir/explored')

The following is the output of the elapsed time (seconds) printed in the console:

 Iteration 2 / 280. Elapsed time: 407.85055565834045
 Iteration 3 / 280. Elapsed time: 434.58717703819275
 Iteration 4 / 280. Elapsed time: 436.2463436126709
 Iteration 5 / 280. Elapsed time: 437.9837713241577
 Iteration 6 / 280. Elapsed time: 440.2417469024658
 Iteration 7 / 280. Elapsed time: 442.7933940887451
 Iteration 8 / 280. Elapsed time: 445.7904782295227
 Iteration 9 / 280. Elapsed time: 449.1104226112366
 Iteration 10 / 280. Elapsed time: 452.3273584842682
 Iteration 11 / 280. Elapsed time: 456.3567247390747
 Iteration 12 / 280. Elapsed time: 460.65562629699707
 Iteration 13 / 280. Elapsed time: 464.7628743648529
 Iteration 14 / 280. Elapsed time: 469.59177350997925
 Iteration 15 / 280. Elapsed time: 474.6557366847992
     Iteration 16 / 280. Elapsed time: 479.7272925376892
 Iteration 17 / 280. Elapsed time: 485.53346991539
 Iteration 18 / 280. Elapsed time: 491.11691975593567
 Iteration 19 / 280. Elapsed time: 497.39954662323
 Iteration 20 / 280. Elapsed time: 504.03624844551086
 Iteration 21 / 280. Elapsed time: 510.45858550071716
 Iteration 22 / 280. Elapsed time: 517.7796952724457
 Iteration 23 / 280. Elapsed time: 525.3149480819702
 Iteration 24 / 280. Elapsed time: 532.6355893611908
 Iteration 25 / 280. Elapsed time: 541.2597570419312
 Iteration 26 / 280. Elapsed time: 549.2841284275055
 Iteration 27 / 280. Elapsed time: 558.8050730228424
 Iteration 28 / 280. Elapsed time: 567.617687702179
 Iteration 29 / 280. Elapsed time: 577.8864963054657
 Iteration 30 / 280. Elapsed time: 587.5171909332275
 Iteration 31 / 280. Elapsed time: 598.4596126079559
 Iteration 32 / 280. Elapsed time: 608.7272901535034
 Iteration 33 / 280. Elapsed time: 620.6863214969635
 Iteration 34 / 280. Elapsed time: 631.9231634140015
 Iteration 35 / 280. Elapsed time: 643.090336561203
 Iteration 36 / 280. Elapsed time: 656.1529128551483
 Iteration 37 / 280. Elapsed time: 667.9437139034271
 Iteration 38 / 280. Elapsed time: 681.2613704204559
 Iteration 39 / 280. Elapsed time: 695.7434968948364
 Iteration 40 / 280. Elapsed time: 709.1406977176666
 Iteration 41 / 280. Elapsed time: 723.0397245883942
 Iteration 42 / 280. Elapsed time: 737.5559349060059
 Iteration 43 / 280. Elapsed time: 753.8705065250397
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 44 / 280. Elapsed time: 768.2957532405853
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 45 / 280. Elapsed time: 783.177583694458
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 46 / 280. Elapsed time: 798.720709323883
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 47 / 280. Elapsed time: 814.6071207523346
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 48 / 280. Elapsed time: 830.2278523445129
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 49 / 280. Elapsed time: 846.3982262611389
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 50 / 280. Elapsed time: 865.5728619098663
 Iteration 51 / 280. Elapsed time: 882.612627029419
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
 Iteration 52 / 280. Elapsed time: 900.9131906032562
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
 Iteration 53 / 280. Elapsed time: 919.1079332828522
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
 Iteration 54 / 280. Elapsed time: 937.6077470779419
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
 Iteration 55 / 280. Elapsed time: 957.1775703430176

I am executing it locally in a laptop with 16GB RAM and 12 cores. The dataset is 7GB approx stored as parquet.

I would appreciate on guidance of what I am doing wrong or a way to discard completed graph actions.

Thanks!


Solution

  • Your first solution (with persist), seems to be reasonable. The number of tasks in UI is going to be cumulative (so they should not be calculated from scratch every time, and if you have 100 partitions, they will be increasing in multiples of 100).

    Here's an example I was working with:

    import dask.dataframe as dd
    from dask.distributed import Client
    import pandas as pd
    import numpy as np
    import time
    
    client = Client()
    client
    
    max_number_of_nodes = 35
    number_of_ties = 1_000
    network = pd.DataFrame(np.random.randint(max_number_of_nodes, size=(number_of_ties,2)), columns=['source', 'target'])
    ddf = dd.from_pandas(network, npartitions=10)
    
    for i in range(10):
        ddf = ddf[ddf['source']//i!=5]
        ddf = client.persist(ddf)
        time.sleep(1)