Search code examples
juliadistributed

@distributed seems to work, function return is wonky


I'm just learning how to do parallel computing in Julia. I'm using @sync @distributed at the start of a 3x nested for loop to parallelize things (see code at bottom). From the line println(errCmp[row, col]) I can watch all the elements of the array errCmp be printed out. E.g.

From worker 3:    2.351134946074191e9
From worker 4:    2.3500830193505473e9
From worker 5:    2.3502416529551845e9
From worker 2:    2.3509105625656652e9
From worker 3:    2.3508352842971106e9
From worker 4:    2.3497049296121807e9
From worker 5:    2.35048428351797e9
From worker 2:    2.350742582031195e9
From worker 3:    2.350616273660934e9
From worker 4:    2.349709546599313e9

However, when the function returns, errCmp is the array of zeros I pre-allocate at the begging.

Am I missing some closing term to collect everything?

function optimizeDragCalc(df::DataFrame)
    paramGrid = [cd*AoM for cd = range(1e-3, stop = 0.01, length = 50), AoM = range(2e-4, stop = 0.0015, length = 50)]
    errCmp    = zeros(size(paramGrid))
    # totalSize = size(paramGrid, 1) * size(paramGrid, 2) * size(df.time, 1)
    @sync @distributed for row = 1:size(paramGrid, 1)
        for col = 1:size(paramGrid, 2)
            # Run the propagation here
            BC = 1/paramGrid[row, col]
            slns, _ = propWholeTraj(df, BC)
            for time = 1:size(df.time, 1)
                errDF = propError(slns[time], df, time)
                errCmp[row, col] += sum(errDF.totalErr)
            end # time
            # println("row: ", row, " of ",size(paramGrid, 1),"   col: ", col, " of ", size(paramGrid, 2))
            println(errCmp[row, col])
        end # col
    end # row
    # plot(heatmap(z = errCmp))
    return errCmp, paramGrid
end
errCmp, paramGrid = @time optimizeDragCalc(df)

Solution

  • You did not provide a minimal working example but I guess it might be hard. So here is mine MWE. Let us assume that we want to use Distributed to calculate sums of Array's columns:

    using Distributed
    addprocs(2)
    @everywhere using StatsBase
    data = rand(1000,2000)
    res = zeros(2000)
    @sync @distributed for col = 1:size(data)[2]
        res[col] = StatsBase.mean(data[:,col])
        # does not work!
        # ... because data is created locally and never returned!
    end
    

    In order to correct the above code you need to provide an aggregator function (I keep the example intentionally simplified - a further optimization is possible).

    using Distributed
    addprocs(2)
    @everywhere using Distributed,StatsBase
    data = rand(1000,2000)    
    @everywhere function t2(d1,d2)
        append!(d1,d2)
        d1
    end
    res = @sync @distributed (t2) for col = 1:size(data)[2]
        [(myid(),col, StatsBase.mean(data[:,col]))]
    end
    

    Now let us see the output. We can see that some of the values have been calculated on worker 2 while others on worker 3:

    julia> res
    2000-element Array{Tuple{Int64,Int64,Float64},1}:
     (2, 1, 0.49703681326230276)
     (2, 2, 0.5035341367791002)
     (2, 3, 0.5050607022354537)
     ⋮
     (3, 1998, 0.4975699181976122)
     (3, 1999, 0.5009498778934444)
     (3, 2000, 0.499671315490524)
    

    Further possible improvements/modifications:

    • use @spawnat to generate values at remote processes (instead of the master process and sending them)
    • use SharedArray - this allows to automatically distribute data among workers. From my experience requires very careful programming.
    • use ParallelDataTransfer.jl to send data among workers. Very easy to use, not efficient for huge number of messages.
    • always consider Julia threading mechanism (in some scenarios it makes life easier - again depends on the problem)