Search code examples
javaclojure

Thread pool for recursive calls to JDBC in Clojure


Context

I have a database connection pool, of size 20. I am writing a function, which makes a bunch of queries to the database, in a recursive manner

  Node
    - db query: get ids
      - for each id: 
         - db query: get attributes
         - recurse if there are subqueries

(I am aware this is an N+1 query, but bear with me that this is required)

Goal

I want to achieve as much parallelism as I can.

Plan

I initially thought I would use a fixed-sized thread pool, which is the same length as the connection-pool-size. (Since the parallelism here is bound by the jdbc connection pool).

Then I could write a function like so:

(defn- query
  [{:keys [pool] :as ctx} form]
  (let [[where-view eids] (resolve-eids ctx form)
        [obj-nodes child-form-nodes]
        (cp/pvalues pool
                    (cp/upmap pool (partial obj-node ctx) eids)
                    (cp/upmap (partial query-one ctx) (child-forms forms eids)))]
    (-> (make-node where-view)
        (add-children obj-nodes)
        (add-children child-form-nodes))))

Problem

The recursion here could starve the thread-pool, and cause it to hang.

Potential solutions

From research I thought maybe I could use Fork/Join pools. But I see here Is Java's fork-and-join thread pool is good for executing IO bound task? that these are not quite meant for IO-bound tasks.

What would be the best way for me to achieve this?


Solution

  • The problem with the approach in the question is that you want to using the same thread pool for multiple different tasks:

    1. querying the database with a blocking operation, and then
    2. waiting for 1. to finish and then combine the response

    Since you have recursion, the lifecycle of 2. is long and these tasks will eat up all the threads in your executor, until 1. can no longer be executed, thus starving 1. indefinitely.

    However, 2. does not actually need to be blocking a thread. It can be implemented with lightweight processes using core.async so waiting for the query results would be just parking instead. Rewriting the code in the original question would look something like the following:

    1. First, make the functions that are using the database return a channel instead of lazy sequences.
    (defn resolve-eids [ctx form]
      (clojure.core.async/thread ....))
    
    (defn obj-node [ctx eid]
      (clojure.core.async/thread (DO-BLOCKING-SQL-QUERY-HERE)))
    
    (defn query-one [ctx form]
      (clojure.core.async/thread ...))
    

    Remember that you should not use go blocks for blocking operations (IO) as these will bring you back to the original problem. Instead, just use clojure.core.async/thread or something similar to use a dedicated thread pool that can be blocked safely.

    1. Then your query function should return a channel that is implemented with the go block and is using (<!) for parking the process while waiting for the queries to complete.

    (Go blocks are relatively cheap, so the bottleneck now is the thread pool used for the actually IO blocking operatitions.)

    (defn- query [ctx form]
      (go
        (let [[where-view eids] (<! (resolve-eids ctx form))
              obj-nodes         (obj-node ctx eids)
              child-form-nodes  (query-one ctx (child-forms forms eids))]
          (-> (make-node where-view)
              (add-children (<! obj-nodes))
              (add-children (<! child-form-nodes))))))
    

    And since query returns a channel now, you will have to call it like the following to actually get the result of the call: (<!! (query ... ...))