Context
I have a database connection pool, of size 20. I am writing a function, which makes a bunch of queries to the database, in a recursive manner
Node
- db query: get ids
- for each id:
- db query: get attributes
- recurse if there are subqueries
(I am aware this is an N+1 query, but bear with me that this is required)
Goal
I want to achieve as much parallelism as I can.
Plan
I initially thought I would use a fixed-sized thread pool, which is the same length as the connection-pool-size. (Since the parallelism here is bound by the jdbc connection pool).
Then I could write a function like so:
(defn- query
[{:keys [pool] :as ctx} form]
(let [[where-view eids] (resolve-eids ctx form)
[obj-nodes child-form-nodes]
(cp/pvalues pool
(cp/upmap pool (partial obj-node ctx) eids)
(cp/upmap (partial query-one ctx) (child-forms forms eids)))]
(-> (make-node where-view)
(add-children obj-nodes)
(add-children child-form-nodes))))
Problem
The recursion here could starve the thread-pool, and cause it to hang.
Potential solutions
From research I thought maybe I could use Fork/Join
pools. But I see here Is Java's fork-and-join thread pool is good for executing IO bound task? that these are not quite meant for IO-bound tasks.
What would be the best way for me to achieve this?
The problem with the approach in the question is that you want to using the same thread pool for multiple different tasks:
Since you have recursion, the lifecycle of 2. is long and these tasks will eat up all the threads in your executor, until 1. can no longer be executed, thus starving 1. indefinitely.
However, 2. does not actually need to be blocking a thread. It can be implemented with lightweight processes using core.async so waiting for the query results would be just parking instead. Rewriting the code in the original question would look something like the following:
(defn resolve-eids [ctx form]
(clojure.core.async/thread ....))
(defn obj-node [ctx eid]
(clojure.core.async/thread (DO-BLOCKING-SQL-QUERY-HERE)))
(defn query-one [ctx form]
(clojure.core.async/thread ...))
Remember that you should not use go
blocks for blocking operations (IO) as these will bring you back to the original problem. Instead, just use clojure.core.async/thread
or something similar to use a dedicated thread pool that can be blocked safely.
go
block and is using (<!)
for parking the process while waiting for the queries to complete.(Go blocks are relatively cheap, so the bottleneck now is the thread pool used for the actually IO blocking operatitions.)
(defn- query [ctx form]
(go
(let [[where-view eids] (<! (resolve-eids ctx form))
obj-nodes (obj-node ctx eids)
child-form-nodes (query-one ctx (child-forms forms eids))]
(-> (make-node where-view)
(add-children (<! obj-nodes))
(add-children (<! child-form-nodes))))))
And since query
returns a channel now, you will have to call it like the following to actually get the result of the call: (<!! (query ... ...))