Search code examples
clojurerabbitmqquartz-schedulerlangohr

Can I do `blocking-subscribe` in a scheduled task which runs repeatedly?


Recently I've found my Clojure/Ring/Jetty server repeatedly go into NoClassDefFoundError when I cider-connect into it and do something. I guess that is because the thread pool being exhausted by some dead threads.

Then I've found this function in the server which runs one time every day by a definition clojurewerkz.quartzite.scheduler job:

(defn consumer-msgs-announcement
  [rabbitmq queue-name & args]
  (with-open [conn (lc/connect rabbitmq)]
    (let [ch  (lch/open conn)]
      (lq/declare ch queue-name {:durable false :auto-delete false})
      (println " [*] Waiting for messages. To exit press CTRL+C")
      ;; (lcons/blocking-subscribe ch queue-name msg-queue/post-wxmsg-handle-delivery {:auto-ack true})
      (lcons/blocking-subscribe ch queue-name handle-delivery-announcement {:auto-ack true})
      )))

in which included packages are defined as:

   [langohr.core :as lc]
   [langohr.channel :as lch]
   [clojure.string :as str]
   [langohr.queue :as lq]
   [langohr.consumers :as lcons]

I doubt the blocking-subscribe will cause the thread which it used to be blocked forever which exhausts the thread pool of JVM and finally causes the NoClassDefFoundError error.

I am not sure about this, but can I do blocking-subscribe in a scheduled task which runs repeatedly?

Is the thing I described above possible? Or are there any wrong with my deduction?

Thanks.


Solution

  • In my experience blocking-subscribe does block the thread indefinitely. You can quickly check the source for blocking-subscribe to see that it goes into a do-seq on a potentially endless lazy-seq.

    It looks like you just want to have your job collect any existing messages that have backed up in the queue since the last run. Here are a couple of options that stick out to me for this style of access pattern.

    1. You can use langohr.basic/get in order to process messages on the queue until you receive a nil.

    2. You can use langohr.consumers/subscribe to create a simple non-blocking consumer

      • Start the consumer
      • Keep the consumer tag you get
      • Sleep for a predetermined amount of time
      • Cancel the consumer using langohr.basic/cancel

    P.S. I am unable to post the link to the subscribe/cancel documentation, but it's easy to find in the source and in the langohr API reference.