Search code examples
clojureatomiccompojurering

Clojure atom PersistentQueue in a Ring Application behaviour


I have a ring application using compojure. I created an atom with PersistentQueue to store IDs of process execution and block duplicate executions with other request to my API with same ID.

But during my tests, the Atom works very well, but only at same endpoint at my API. If I call other endpoint the behaviour is different.

My atom:

(def queue (atom clojure.lang.PersistentQueue/EMPTY))

(defn add-to-queue [number]
  (swap! queue conj number))

(defn remove-from-queue [number] 
  (swap! queue (fn [q] (remove #{number} q))))

(defn queue-has-id? [number]
  (let [pr-queue (seq @queue)]
    (if-not (nil? pr-queue)
      (> (.indexOf pr-queue number) -1)
      false)))

To illustrate, when I call my endpoint http://localhost:3000/run, the function add-to-queue is called and my Atom content is swap to a queue with one id.

Atom statuses:

value behaviour

[]     INIT

[1]    CALL RUN WITH ID 1

And during my process execution if I call the endpoint 'RUN' again, I called the function queue-has-id? to block if the id is present, in this scenario, the id '1' is present then the execution is blocked.

But if I called other ENDPOINT 'retrieve', my atom queue value is [1] but the indexOf id returning false.

Someone knows what is problem in this implementation? What I know is the atom is shared to concurrency process during my application life cyle, why this problem occur?


Solution

  • First of all, you do not use queue in its idiomatic way. A queue is a kind of abstract data type that provides ordered container for items with the next operations:

    • enqueue, conj for clojure.lang.PersistentQueue
    • dequeue, peek for getting head item and pop to return queue without head item
    • optional, emptiness checking, empty?

    As I see you want a collection that provides storing unique numbers (IDs) and remove them when needed. I can suggest to use set data structure. In this case your code should look like

    (def numbers (atom #{}))
    
    (defn add-number! [n]
      (swap! numbers conj n))
    
    (defn contains-number? [n]
      (contains? @numbers n))
    
    (defn remove-number! [n]
      (swap! numbers disj n))
    

    But if you for some reason still want to use PersistentQueue your code should look like

    (def queue (ref clojure.lang.PersistentQueue/EMPTY))
    
    (defn enqueue!
      "It doesn't check uniqueness."
      [item]
      (dosync (alter queue conj item)))
    
    (defn has-item?
      "Be careful, it is O(n) operation."
      [item]
      (some #(= item %) @queue))
    
    (defn dequeue!
      "Pop element off the queue and returns it"
      []
      (dosync
       (let [v (peek @queue)]
         (alter queue pop)
         v)))
    
    (defn remove!
      "Because you want to remove all repetition of an item you should
       convert current a queue to a sequence, remove items and convert
       it back to a queue. It creates intermediate collection and it is
       WRONG way to use queue. But it is still achievable."
      [item]
      (dosync
       (alter queue #(apply conj
                            clojure.lang.PersistentQueue/EMPTY
                            (remove #{item} %)))))
    

    As you see I use ref instead of atom because of nature of PersistentQueue type that provides two functions peek and pop to dequeue item. The dosync macro ensures that all expressions passed to it will be applied synchronously. So that it ensures that two threads do not peek the same item and pop twice.