Search code examples
f#mailboxprocessor

Return results to the caller with a throttling queue


Building on a snippet and answer, would it be possible to return results to the caller from the throttling queue? I've tried PostAndAsyncReply to receive reply on a channel but it's throwing an error if I pipe it with Enqueue. Here's the code.

Appreciate a F# core vanilla based solution around Queue or Mailbox design patterns.

Question

The question is to be able to call functions asynchronously based on the throttle (max 3 at a time), passing each item from the array, wait on the whole queue/array until it's finished while collecting all the results and then return the results to the caller. (Return the results to the caller is what's pending in here)

Callee Code

// Message type used by the agent - contains queueing
// of work items and notification of completion
type ThrottlingAgentMessage =
  | Completed
  | Enqueue of Async<unit>

/// Represents an agent that runs operations in concurrently. When the number
/// of concurrent operations exceeds 'limit', they are queued and processed later
let throttlingAgent limit =
    MailboxProcessor.Start(fun inbox ->
    async {
      // The agent body is not executing in parallel,
      // so we can safely use mutable queue & counter
      let queue = System.Collections.Generic.Queue<Async<unit>>()
      let running = ref 0

      while true do

        // Enqueue new work items or decrement the counter
        // of how many tasks are running in the background
        let! msg = inbox.Receive()
        match msg with
        | Completed -> decr running
        | Enqueue w -> queue.Enqueue(w)

        // If we have less than limit & there is some work to
        // do, then start the work in the background!
        while running.Value < limit && queue.Count > 0 do
          let work = queue.Dequeue()
          incr running
          do! // When the work completes, send 'Completed'
              // back to the agent to free a slot
              async {
                do! work
                inbox.Post(Completed)
              }
              |> Async.StartChild
              |> Async.Ignore
    })


let requestDetailAsync (url: string) : Async<Result<string, Error>> =
     async {
       Console.WriteLine ("Simulating request " + url)
       try
           do! Async.Sleep(1000) // let's say each request takes about a second
           return Ok (url + ":body...")
       with :? WebException as e ->
           return Error {Code = "500"; Message = "Internal Server Error"; Status = HttpStatusCode.InternalServerError}
     }

let requestMasterAsync() : Async<Result<System.Collections.Concurrent.ConcurrentBag<_>, Error>> =
    async {
        let urls = [|
                    "http://www.example.com/1";
                    "http://www.example.com/2";
                    "http://www.example.com/3";
                    "http://www.example.com/4";
                    "http://www.example.com/5";
                    "http://www.example.com/6";
                    "http://www.example.com/7";
                    "http://www.example.com/8";
                    "http://www.example.com/9";
                    "http://www.example.com/10";
                |]

        let results = System.Collections.Concurrent.ConcurrentBag<_>()
        let agent = throttlingAgent 3

        for url in urls do
            async {
                let! res = requestDetailAsync url
                results.Add res
            }
            |> Enqueue
            |> agent.Post

        return Ok results
    }

Caller Code

[<TestMethod>]
member this.TestRequestMasterAsync() =
    match Entity.requestMasterAsync() |> Async.RunSynchronously with
    | Ok result -> Console.WriteLine result
    | Error error -> Console.WriteLine error

Solution

  • You could use Hopac.Streams for that. With such tool it is pretty trivial:

    open Hopac
    open Hopac.Stream
    open System
    
    let requestDetailAsync url = async {
       Console.WriteLine ("Simulating request " + url)
       try
           do! Async.Sleep(1000) // let's say each request takes about a second
           return Ok (url + ":body...")
       with :? Exception as e ->
           return Error e
     }
    
    let requestMasterAsync() : Stream<Result<string,exn>> =
        [| "http://www.example.com/1"
           "http://www.example.com/2"
           "http://www.example.com/3"
           "http://www.example.com/4"
           "http://www.example.com/5"
           "http://www.example.com/6"
           "http://www.example.com/7"
           "http://www.example.com/8"
           "http://www.example.com/9"
           "http://www.example.com/10" |]
        |> Stream.ofSeq
        |> Stream.mapPipelinedJob 3 (requestDetailAsync >> Job.fromAsync)
    
    requestMasterAsync()
    |> Stream.iterFun (printfn "%A")
    |> queue //prints all results asynchronously
    
    let allResults : Result<string,exn> list = 
        requestMasterAsync()
        |> Stream.foldFun (fun results cur -> cur::results ) []
        |> run //fold stream into list synchronously
    

    ADDED In case you want to use only vanilla FSharp.Core with mailboxes only try this:

    type ThrottlingAgentMessage =
      | Completed
      | Enqueue of Async<unit>
    
    let inline (>>=) x f = async.Bind(x, f)
    let inline (>>-) x f = async.Bind(x, f >> async.Return)
    
    let throttlingAgent limit =
        let agent = MailboxProcessor.Start(fun inbox ->
            let queue = System.Collections.Generic.Queue<Async<unit>>()
    
            let startWork work = 
                work
                >>- fun _ -> inbox.Post Completed
                |> Async.StartChild |> Async.Ignore
    
            let rec loop curWorkers =
                inbox.Receive()
                >>= function
                | Completed when queue.Count > 0 -> 
                    queue.Dequeue() |> startWork
                    >>= fun _ -> loop curWorkers
                | Completed -> 
                    loop (curWorkers - 1)
                | Enqueue w when curWorkers < limit ->
                    w |> startWork
                    >>= fun _ -> loop (curWorkers + 1)
                | Enqueue w ->
                    queue.Enqueue w
                    loop curWorkers
    
            loop 0)
        Enqueue >> agent.Post
    

    It is pretty much the same logic, but slightly optimized to not use queue if there is free worker capacity (just start job and don't bother with queue/dequeue).

    throttlingAgent is a function int -> Async<unit> -> unit Because we don't want client to bother with our internal ThrottlingAgentMessage type.

    use like this:

    let throttler = throttlingAgent 3
    
    for url in urls do
        async {
            let! res = requestDetailAsync url
            results.Add res
        }
        |> throttler