Recently I've found my Clojure/Ring/Jetty server repeatedly go into NoClassDefFoundError
when I cider-connect
into it and do something. I guess that is because the thread pool being exhausted by some dead threads.
Then I've found this function in the server which runs one time every day by a definition clojurewerkz.quartzite.scheduler
job:
(defn consumer-msgs-announcement
[rabbitmq queue-name & args]
(with-open [conn (lc/connect rabbitmq)]
(let [ch (lch/open conn)]
(lq/declare ch queue-name {:durable false :auto-delete false})
(println " [*] Waiting for messages. To exit press CTRL+C")
;; (lcons/blocking-subscribe ch queue-name msg-queue/post-wxmsg-handle-delivery {:auto-ack true})
(lcons/blocking-subscribe ch queue-name handle-delivery-announcement {:auto-ack true})
)))
in which included packages are defined as:
[langohr.core :as lc]
[langohr.channel :as lch]
[clojure.string :as str]
[langohr.queue :as lq]
[langohr.consumers :as lcons]
I doubt the blocking-subscribe
will cause the thread which it used to be blocked forever which exhausts the thread pool of JVM and finally causes the NoClassDefFoundError
error.
I am not sure about this, but can I do blocking-subscribe
in a scheduled task which runs repeatedly?
Is the thing I described above possible? Or are there any wrong with my deduction?
Thanks.
In my experience blocking-subscribe
does block the thread indefinitely. You can quickly check the source for blocking-subscribe
to see that it goes into a do-seq on a potentially endless lazy-seq.
It looks like you just want to have your job collect any existing messages that have backed up in the queue since the last run. Here are a couple of options that stick out to me for this style of access pattern.
You can use langohr.basic/get in order to process messages on the queue until you receive a nil
.
You can use langohr.consumers/subscribe
to create a simple non-blocking consumer
P.S. I am unable to post the link to the subscribe/cancel
documentation, but it's easy to find in the source and in the langohr API reference.