Search code examples
multithreadingclojurecore.async

Upper limit for number of jobs in a go block?


Here is the code:

(ns typedclj.async
  (:require [clojure.core.async
             :as a
             :refer [>! <! >!! <!!
                     go chan buffer
                     close! thread
                     alts! alts!! timeout]]
            [clj-http.client :as -cc]))


(time (dorun
        (let [c (chan)]
          (doseq [i (range 10 1e4)]
            (go (>! c i))))))

And I got an error:

Exception in thread "async-dispatch-12" java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
(< (.size puts) impl/MAX-QUEUE-SIZE)
    at clojure.core.async.impl.channels.ManyToManyChannel.put_BANG_(channels.clj:150)
    at clojure.core.async.impl.ioc_macros$put_BANG_.invoke(ioc_macros.clj:959)
    at typedclj.async$eval11807$fn__11816$state_machine__6185__auto____11817$fn__11819.invoke(async.clj:19)
    at typedclj.async$eval11807$fn__11816$state_machine__6185__auto____11817.invoke(async.clj:19)
    at clojure.core.async.impl.ioc_macros$run_state_machine.invoke(ioc_macros.clj:940)
    at clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invoke(ioc_macros.clj:944)
    at typedclj.async$eval11807$fn__11816.invoke(async.clj:19)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)...

According to http://martintrojer.github.io/clojure/2013/07/07/coreasync-and-blocking-io/

... This will break the 1 job = 1 thread knot, thus this thread parking will allow us to scale the number of jobs way beyond any thread limit on the platform (usually around 1000 on the JVM).

core.async gives (blocking) channels and a new (unbounded) thread pool when using 'thread'. This (in effect) is just some sugar over using java threads (or clojure futures) and BlockingQueues from java.util.concurrent. The main feature is go blocks in which threads can be parked and resumed on the (potentially) blocking calls dealing with core.async's channels...

Is 1e4 jobs already too many? What is the upper limit then?


Solution

  • I don't usually rant like this so I hope you will forgive me this one transgression:

    In a more prefect world every programmer would repeat to themselves "there is no such thing as an unbounded queue" five times before sleeping and first thing upon waking. This mode of thinking requires firguring out how backpressure will be handled in your system so when there is a slowdown somewhere in the process the parts before that have a way to find out about it and slow themselves down in response. In core.async the default back pressure is immediate because the default buffer size is zero. No go block succeeds in putting something into a chan until someone is ready to consume it.

    chans look basically like this:

    "queue of pending puts" --> buffer --> "queue of pending takes"
    

    The putter and taker queues are intended to allow time for the two processes that are communicating via this pipe to schedule themselves so progress can be made. Without these there would be no room for threads to schedule and deadlocks would happen. They are NOT intended to be used as the buffer. thats what the buffer in the middle is for, and this was the design behind making that the only one that has a explicit size. explicitly set the buffer size for your system by setting the size of the buffer in the chan:

    user> (time (dorun
            (let [c (chan 1e6)]
              (doseq [i (range 10 1e4)]
                (go (>! c i))))))
    "Elapsed time: 83.526679 msecs"
    nil
    

    In this case I have "calculated" that my system as a whole will be in a good state if there are up to a million waiting jobs. Of course your real world expierences will be different, and very much unique to your situation.

    Thanks for your patience,