Search code examples
parallel-processingfunctional-programmingf#deadlockmailboxprocessor

F# Array.Parallel.map does not provide parallel processing


I have to simulate a discrete environment in F#, to be called by Python, for a reinforcement learning problem. I had a function with primitive types (mainly float) to make the exchange of data smoother. Now I am in the position to run this function many times with different data, so to run it in parallel seems a good idea.

I have the following code:

type AscentStrategy = |Strategy of seq<float> 

let simulateAscent  env ascentLimiter initState (sequenceOfDepths:seq<float>)  =   
    //let infinitSeqOfConstantValues = (fun _ -> constantDepth) |> Seq.initInfinite
    sequenceOfDepths 
    |> Seq.scan ( fun ( nextState, rew, isTerminal, _ )  depth -> getNextEnvResponseAndBoundForNextAction(env, nextState , depth , ascentLimiter)  ) (  initState, 0.0 , false, 0.0)  
    |> SeqExtension.takeWhileWithLast (fun (_ , _, isTerminalState, _) ->  not isTerminalState)
    |> Seq.toArray

and then 

    let simulateStrategy ({MaxPDCS = maxPDCS ; MaxSimTime = maximumSimulationTime ; PenaltyForExceedingRisk = penaltyForExceedingRisk ; 
                       RewardForDelivering = rewardForDelivering ; PenaltyForExceedingTime = penaltyForExceedingTime ; IntegrationTime = integrationTime 
                       ControlToIntegrationTimeRatio = controlToIntegrationTimeRatio; DescentRate = descentRate; MaximumDepth = maximumDepth ; 
                       BottomTime = bottomTime ; LegDiscreteTime = legDiscreteTime } : SimulationParameters) (Strategy ascentStrategy : AscentStrategy) = 
    
    let env, initState ,  ascentLimiter , _  =  getEnvInitStateAndAscentLimiter  ( maxPDCS    , maximumSimulationTime , 
                                                                           penaltyForExceedingRisk ,  rewardForDelivering , penaltyForExceedingTime , 
                                                                           integrationTime  ,
                                                                           controlToIntegrationTimeRatio,  
                                                                           descentRate , 
                                                                           maximumDepth  , 
                                                                           bottomTime  , 
                                                                           legDiscreteTime   ) 
    ascentStrategy
    |> simulateAscent  env ascentLimiter initState

finally I call the function for testing:

 let commonSimulationParameters = {MaxPDCS = 0.32 ; MaxSimTime = 2000.0 ; PenaltyForExceedingRisk  = 1.0 ; RewardForDelivering = 10.0; PenaltyForExceedingTime = 0.5 ;
                                      IntegrationTime = 0.1; ControlToIntegrationTimeRatio = 10; DescentRate = 60.0; MaximumDepth = 20.0 ; BottomTime = 10.0;  LegDiscreteTime = 0.1}


    printfn"insert number of elements"
    let maxInputsString = Console.ReadLine()
    let maxInputs = maxInputsString |> Double.Parse
    let inputsStrategies =  [|0.0 .. maxInputs|] |> Array.map (fun x -> Seq.initInfinite (fun _ -> x ) )
    let testParallel = inputsStrategies 
                   |> Array.Parallel.map (fun x -> (simulateStrategy commonSimulationParameters ( Strategy x )) ) 

I have compared this with Array.map and, while it is faster and uses 70% of the CPU on my laptop, still does not seem to use the whole processing power. I have run it on a machine with many more cores ( ~50) and it barely increases the CPU usage (it gets up to 3/4% of total usage with 50ish independent inputs). I think there must be a deadlock generated somewhere, but how can I detect and get rid of it?

Also, why does this happen? One of the advantages of functional programming, as I see it, is also to be able to parallelize easily.

PS: SeqExtension.takeWhileWithLast is a function I have found on SO, kindly provided by Tomas Petricek in one of his brilliant answers, if needed I can post it.

PPS: env is the environment, whose type is defined as:

type Environment<'S, 'A ,'I>         =   |Environment of (State<'S> -> Action<'A> -> EnvironmentOutput<'S ,'I>)

I have tried the same with Async.Parallel and ParallelSeq, reporting the same problem.

Would a message-based solution solve the problem>? I am looking into it, although I am not familiar at all, but would it be a good way of getting the code parallel, using MailboxProcessor?


Following my question, I have tried also this great library for parallel code, based on streams of data. https://nessos.github.io/Streams/.

I have added the following code:

let nessosResult = inputsStrategies
                    |> ParStream.ofArray
                    |> ParStream.map simulateStrategy
                    |> ParStream.toArray

I have defined an ad hoc type for inputStrategy (basic the old tuple I had) so that simulateStrategy accepts only one input. Unfortunately the problem seems very well hidden somewhere. I attach a graph with CPU usage. Time spent on my machine for the different cases is: ~8.8 sec (sequential); ~6.2 sec (Array.Parallel.map); ~ 6.1 sec (Nessos.Streams)cpu consumption


Solution

  • I have found that server garbage collection is necessary to get the best parallel performance on .NET. Something like this in your app.config:

    <configuration>
      <runtime>
        <gcServer enabled="true" />
      </runtime>
    </configuration>