I am currently implementing a variation of the Dijkstra algorithm in Dask and Spark (form comparison purposes) for airline departures, which involves a sequential computation over the nodes of the graph. Additionaly, on every step I filter out some records in the graph (nodes) as they become unfeasable due to their departure time. However, new iterations take longer than the previous ones although the dataframes become smaller. I solved this problem in Spark by writing intermmediate results to parquet, but I cannot solve it for Dask.
I suspect that the dataframe is executing again every step in the graph, however I have been unable to prevent this from happening.
So far I have tried the following approaches:
persist
(This one has been the fastest). However the number of tasks to complete increases in the UI with every iteration. For example: Iteration 8 shows x/800 and iteration 9 shows x/900 (I am using 100 partitions).while i < n_nodes:
i += 1
# some computations over df
df = client.persist(df)
# add new records from df to explored
# some computations over explored
explored = client.persist(explored)
del df
and client.cancel(df)
with little effect on computation time so I decided to comment them out.while i < n_nodes:
i += 1
# some computations over df
os.system('rm -r temp_dir/df_vuelos_dask')
df.to_parquet('temp_dir/df_vuelos_dask')
# del df
# client.cancel(df)
df = dd.read_parquet('temp_dir/df_vuelos_dask')
# add new records from df to explored
# some computations over explored
os.system('rm -r temp_dir/explored')
explored.to_parquet('temp_dir/explored')
# del explored
# client.cancel(explored)
dd.read_parquet('temp_dir/explored')
client.restart()
. This one was not good because it deleted the contents of df and explored which became problematic.while i < n_nodes:
i += 1
# some computations over df
os.system('rm -r temp_dir/df_vuelos_dask')
df.to_parquet('temp_dir/df_vuelos_dask')
client.restart()
df = dd.read_parquet('temp_dir/df_vuelos_dask')
# add new records from df to explored
# some computations over explored
os.system('rm -r temp_dir/explored')
explored.to_parquet('temp_dir/explored')
client.restart()
dd.read_parquet('temp_dir/explored')
The following is the output of the elapsed time (seconds) printed in the console:
Iteration 2 / 280. Elapsed time: 407.85055565834045
Iteration 3 / 280. Elapsed time: 434.58717703819275
Iteration 4 / 280. Elapsed time: 436.2463436126709
Iteration 5 / 280. Elapsed time: 437.9837713241577
Iteration 6 / 280. Elapsed time: 440.2417469024658
Iteration 7 / 280. Elapsed time: 442.7933940887451
Iteration 8 / 280. Elapsed time: 445.7904782295227
Iteration 9 / 280. Elapsed time: 449.1104226112366
Iteration 10 / 280. Elapsed time: 452.3273584842682
Iteration 11 / 280. Elapsed time: 456.3567247390747
Iteration 12 / 280. Elapsed time: 460.65562629699707
Iteration 13 / 280. Elapsed time: 464.7628743648529
Iteration 14 / 280. Elapsed time: 469.59177350997925
Iteration 15 / 280. Elapsed time: 474.6557366847992
Iteration 16 / 280. Elapsed time: 479.7272925376892
Iteration 17 / 280. Elapsed time: 485.53346991539
Iteration 18 / 280. Elapsed time: 491.11691975593567
Iteration 19 / 280. Elapsed time: 497.39954662323
Iteration 20 / 280. Elapsed time: 504.03624844551086
Iteration 21 / 280. Elapsed time: 510.45858550071716
Iteration 22 / 280. Elapsed time: 517.7796952724457
Iteration 23 / 280. Elapsed time: 525.3149480819702
Iteration 24 / 280. Elapsed time: 532.6355893611908
Iteration 25 / 280. Elapsed time: 541.2597570419312
Iteration 26 / 280. Elapsed time: 549.2841284275055
Iteration 27 / 280. Elapsed time: 558.8050730228424
Iteration 28 / 280. Elapsed time: 567.617687702179
Iteration 29 / 280. Elapsed time: 577.8864963054657
Iteration 30 / 280. Elapsed time: 587.5171909332275
Iteration 31 / 280. Elapsed time: 598.4596126079559
Iteration 32 / 280. Elapsed time: 608.7272901535034
Iteration 33 / 280. Elapsed time: 620.6863214969635
Iteration 34 / 280. Elapsed time: 631.9231634140015
Iteration 35 / 280. Elapsed time: 643.090336561203
Iteration 36 / 280. Elapsed time: 656.1529128551483
Iteration 37 / 280. Elapsed time: 667.9437139034271
Iteration 38 / 280. Elapsed time: 681.2613704204559
Iteration 39 / 280. Elapsed time: 695.7434968948364
Iteration 40 / 280. Elapsed time: 709.1406977176666
Iteration 41 / 280. Elapsed time: 723.0397245883942
Iteration 42 / 280. Elapsed time: 737.5559349060059
Iteration 43 / 280. Elapsed time: 753.8705065250397
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 44 / 280. Elapsed time: 768.2957532405853
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 45 / 280. Elapsed time: 783.177583694458
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 46 / 280. Elapsed time: 798.720709323883
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 47 / 280. Elapsed time: 814.6071207523346
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 48 / 280. Elapsed time: 830.2278523445129
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 49 / 280. Elapsed time: 846.3982262611389
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 50 / 280. Elapsed time: 865.5728619098663
Iteration 51 / 280. Elapsed time: 882.612627029419
distributed.utils_perf - WARNING - full garbage collections took 10% CPU time recently (threshold: 10%)
Iteration 52 / 280. Elapsed time: 900.9131906032562
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
Iteration 53 / 280. Elapsed time: 919.1079332828522
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
Iteration 54 / 280. Elapsed time: 937.6077470779419
distributed.utils_perf - WARNING - full garbage collections took 11% CPU time recently (threshold: 10%)
Iteration 55 / 280. Elapsed time: 957.1775703430176
I am executing it locally in a laptop with 16GB RAM and 12 cores. The dataset is 7GB approx stored as parquet.
I would appreciate on guidance of what I am doing wrong or a way to discard completed graph actions.
Thanks!
Your first solution (with persist
), seems to be reasonable. The number of tasks in UI is going to be cumulative (so they should not be calculated from scratch every time, and if you have 100 partitions, they will be increasing in multiples of 100).
Here's an example I was working with:
import dask.dataframe as dd
from dask.distributed import Client
import pandas as pd
import numpy as np
import time
client = Client()
client
max_number_of_nodes = 35
number_of_ties = 1_000
network = pd.DataFrame(np.random.randint(max_number_of_nodes, size=(number_of_ties,2)), columns=['source', 'target'])
ddf = dd.from_pandas(network, npartitions=10)
for i in range(10):
ddf = ddf[ddf['source']//i!=5]
ddf = client.persist(ddf)
time.sleep(1)