with Julia 1.5.3, I wanted to pass a list or parameters to the distributed workers.
I first tried in a non distributed way :
using Distributed
@everywhere begin
using SharedArrays
solve(a,b,c) = return (1,2,3)
d_rates = LinRange(0.01, 0.33, 5)
m_rates = LinRange(0.01, 0.25, 5)
population_size = 10^3
max_iterations_perloop = 10^3
nb_repeats = 2
nb_params = length(d_rates)*length(m_rates)*nb_repeats
para = enumerate(Base.product(d_rates, m_rates, population_size, max_iterations_perloop, 1:nb_repeats))
results = SharedArray{Tuple{Int, Int, Int}}(nb_params)
end
for (y , x) in para
results[y] = solve(x[1], x[2], x[3])
end
which worked fine. And then changed the final loop to:
@sync @distributed for (y , x) in para
results[y] = solve(x[1], x[2], x[3])
end
I then got an error (truncated):
ERROR: LoadError: TaskFailedException:
MethodError: no method matching firstindex(::Base.Iterators.Enumerate{Base.Iterators.ProductIterator{Tuple{LinRange{Float64},LinRange{Float64},Int64,Int64,UnitRange{Int64}}}})
Closest candidates are:
firstindex(::Cmd) at process.jl:638
firstindex(::Core.SimpleVector) at essentials.jl:599
firstindex(::Base64.Buffer) at /buildworker/worker/package_linux64/build/usr/share/julia/stdlib/v1.5/Base64/src/buffer.jl:18
...
Stacktrace:
[1] (::Distributed.var"#159#161"{var"#271#272",Base.Iterators.Enumerate{Base.Iterators.ProductIterator{Tuple{LinRange{Float64},LinRange{Float64},Int64,Int64,UnitRange{Int64}}}}})() at ./task.jl:332
Stacktrace:
[1] sync_end(::Channel{Any}) at ./task.jl:314
[2] top-level scope at task.jl:333
[3] include_string(::Function, ::Module, ::String, ::String) at ./loading.jl:1088
[4] include_string(::Module, ::String, ::String) at ./loading.jl:1096
[5] invokelatest(::Any, ::Any, ::Vararg{Any,N} where N; kwargs::Base.Iterators.Pairs{Union{},Union{},Tuple{},NamedTuple{(),Tuple{}}}) at ./essentials.jl:710
[6] invokelatest(::Any, ::Any, ::Vararg{Any,N} where N) at ./essentials.jl:709
Is it possible to pass such a list, if so how?
I assume that all your workers are on a single server and that you have actually added some workers using the addprocs
command. The first problem with your code is that you create the SharedArray
on all workers. Rather than that the syntax of a SharedArray
is the following:
help?> SharedArray
SharedArray{T}(dims::NTuple; init=false, pids=Int[])
SharedArray{T,N}(...)
Construct a SharedArray of a bits type T and size dims across the processes specified by pids - all of which have to be on the same host. (...)
This means that you create SharedArray
only once from the master worker and you can specify the workers that are aware of it using the pids
argument (if you do not specify pids
all worker processes have the access).
Hence your code will look like this:
using Distributed, SharedArrays
addprocs(4)
@everywhere using SharedArrays
@everywhere solve(a,b,c) = return (1,2,3)
#(...) # your setup code without @everywhere
results = SharedArray{Tuple{Int, Int, Int}}(nb_params)
@sync @distributed for (y , x) in collect(para)
results[y] = solve(x[1], x[2], x[3])
end
Note that you will need collect
because @distributed
macro needs to know the size of the Vector
and it does not work good with iterators.