Search code examples
concurrencyclojurechannelcore.async

core.async loop blocked waiting to read from channel


let's say I've got a channel out (chan). I need to take values that are put into the channel and add them. The number of values is undetermined (thus cannot use traditional loop with an end case with (<! out)) and comes from an external IO. I'm using a fixed timeout with alts! but that doesn't seem like the best way to approach the problem. So far, I've got the following (which I got from https://gist.github.com/schaueho/5726a96641693dce3e47)

(go-loop
      [[v ch] (alts! [out (timeout 1000)])
       acc 0]
      (if-not v
        (do (close! out)
            (deliver p acc))
        (do
          (>! task-ch (as/progress-tick))
          (recur (alts! [out (timeout 1000)]) (+ acc v)))))

The problem I've got is that a timeout of 1000 is sometimes not enough and causes the go-loop to exit prematurely (as it may take more than 1000ms for the IO operation to complete and put the val in the out channel). I do not think increasing the timeout value is such a good idea as it may cause me to wait longer than necessary.

What is the best way to guarantee all reads from the out channel and exit out correctly from the loop?

Update:

Why am I using timeout? Because the number of values being put in the channel is not fixed; which means, I cannot create an exit case. W/o the exit case, the go-loop will park indefinely waiting ((<! out)) for values to be put in the channel out. If you have a solve without the timeout, that'd be really awesome.

How do i know I've read the last value? I dont. That's the problem. That's why I'm using timeout and alts!! to exit the go-loop.

What do you want to do w/ the result? Simple addition for now. However, that's not the important bit.

Update Final:

I figured out a way to get the number of values I'd be dealing with. So I modified my logic to make use of that. I'm still going to use the timeout and alts! to prevent any locking.

(go-loop
     [[v _] (alts! [out (timeout 1000)])
      i 0
      acc 0]
      (if (and v (not= n i))
        (do
          (>! task-ch (as/progress-tick))
          (recur (alts! [out (timeout 1000)]) (inc i) (+ acc v)))
        (do (close! out)
            (deliver p* (if (= n i) acc nil)))))

Solution

  • I think your problem is a bit higher-up in your design, not a core-async specific one:

    On one hand, you have an undetermined amount of values coming in a channel — there could be 0, there could be 10, there could be 1,000,000.

    On the other hand, you want to read all of them, do some calculation and then return. This is impossible to do — unless there is some other signal that you can use to say "I think I'm done now".

    If that signal is the timing of the values, then your approach of using alts! is the correct one, albeit I believe the code can be simplified a bit.

    Update: Do you have access to the "upstream" IO? Can you put a sentinel value (e.g. something like ::closed) to the channel when the IO operation finishes?