Search code examples
multithreadingclojuredatalog

datalevin, concurrency, transactions


I am using latest Datalevin version 0.7.8 and wrote the following small program:

(ns datalevintest.core
  (:require [datalevin.core :as dc]))

(def store (System/getenv "DBSTORE"))

(def conn (datalevin.core/get-conn store {} {:auto-entity-time? true :validate-data? true}))

(defn -main [& _]
  (dotimes [i 5]
    (future
      (locking ::println (println "Starting thread"))
      (try
        (dotimes [j 100]
          (dc/transact! conn [{:i+j (+ i j)}])
          (dc/with-transaction [tx-conn conn]
            (dc/transact! tx-conn [{:i*j (* i j)}]))
          (dc/q '[:find (pull ?e [*]) :in $ ?id :where [?e :db/id ?id]]
                (dc/db conn) 2345))
        (catch Throwable t (.printStackTrace t))
        (finally (println "Thread" i "done")))))
  (println "END"))

Nondeterministically, sometimes I get the following:

clojure.lang.ExceptionInfo: Fail to transact to LMDB: "Transaction is not in ready state" {}
    at datalevin.binding.java.LMDB.transact_kv(java.clj:484)
    at datalevin.storage.Store.load_datoms(storage.cljc:376)
    at datalevin.db$local_transact_tx_data.invokeStatic(db.cljc:1236)
    at datalevin.db$local_transact_tx_data.invoke(db.cljc:963)
    at datalevin.db$transact_tx_data.invokeStatic(db.cljc:1274)
    at datalevin.db$transact_tx_data.invoke(db.cljc:1250)
    at datalevin.core$with.invokeStatic(core.cljc:291)
    at datalevin.core$with.invoke(core.cljc:285)
    at datalevin.core$with.invokeStatic(core.cljc:288)
    at datalevin.core$with.invoke(core.cljc:285)
    at datalevin.core$_transact_BANG_$fn__13128$fn__13129.invoke(core.cljc:550)
    at clojure.lang.Atom.swap(Atom.java:37)
    at clojure.core$swap_BANG_.invokeStatic(core.clj:2356)
    at clojure.core$swap_BANG_.invoke(core.clj:2349)
    at datalevin.core$_transact_BANG_$fn__13128.invoke(core.cljc:549)
    at datalevin.core$_transact_BANG_.invokeStatic(core.cljc:548)
    at datalevin.core$_transact_BANG_.invoke(core.cljc:545)
    at datalevin.core$transact_BANG_.invokeStatic(core.cljc:643)
    at datalevin.core$transact_BANG_.invoke(core.cljc:555)
    at datalevin.core$transact_BANG_.invokeStatic(core.cljc:640)
    at datalevin.core$transact_BANG_.invoke(core.cljc:555)
    at datalevintest.core$save_BANG_.invokeStatic(core.clj:10)
    at datalevintest.core$save_BANG_.doInvoke(core.clj:9)
    at clojure.lang.RestFn.invoke(RestFn.java:408)
    at datalevintest.core$_main$fn__13261$fn__13265.invoke(core.clj:32)
    at datalevintest.core$_main$fn__13261.invoke(core.clj:27)
    at clojure.core$binding_conveyor_fn$fn__5772.invoke(core.clj:2034)
    at clojure.lang.AFn.call(AFn.java:18)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

(You may need to run the program multiple times to get the error.)

Less often I get the following:

clojure.lang.ExceptionInfo: Fail to get-first: nil {:dbi "datalevin/eav", :k-range [:all-back], :k-type :eav, :v-type :id}
    at datalevin.scan$get_first.invokeStatic(scan.cljc:233)
    at datalevin.scan$get_first.invoke(scan.cljc:229)
    at datalevin.binding.java.LMDB.get_first(java.clj:502)
    at datalevin.binding.java.LMDB.get_first(java.clj:500)
    at datalevin.storage.Store.init_max_eid(storage.cljc:300)
    at datalevin.db$new_db.invokeStatic(db.cljc:387)
    at datalevin.db$new_db.invoke(db.cljc:379)
    at datalevintest.core$_main$fn__13261$fn__13265$fn__13276.invoke(core.clj:30)
    at datalevintest.core$_main$fn__13261$fn__13265.invoke(core.clj:30)
    at datalevintest.core$_main$fn__13261.invoke(core.clj:27)
    at clojure.core$binding_conveyor_fn$fn__5772.invoke(core.clj:2034)
    at clojure.lang.AFn.call(AFn.java:18)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

If I move the creation of connection into the future wiht create-conn, I get another exception:

java.lang.NullPointerException: Cannot read field "e"
    at datalevin.storage.Store.init_max_eid(storage.cljc:302)
    at datalevin.db$new_db.invokeStatic(db.cljc:387)
    at datalevin.db$new_db.invoke(db.cljc:379)
    at datalevin.db$empty_db.invokeStatic(db.cljc:399)
    at datalevin.db$empty_db.invoke(db.cljc:392)
    at datalevin.core$create_conn.invokeStatic(core.cljc:529)
    at datalevin.core$create_conn.invoke(core.cljc:488)
    at datalevintest.core$_main$fn__13252.invoke(core.clj:14)
    at clojure.core$binding_conveyor_fn$fn__5772.invoke(core.clj:2034)
    at clojure.lang.AFn.call(AFn.java:18)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)

(This one also breaks the database file so the application will not start up the next time.)

The issue comes up in a multithreaded environment and sounds like a concurrency problem. My first idea was that the same connection should'nt be used across different threads, however, the code of get-conn says the same connection will be reused when it alrady exists for a directory. The documentation does not mention multithreading.

What is the error in my code causing the problem and how can I make it safer?


Solution

  • This bug was fixed by version 0.7.9 released half an hour after the question was posted.