Search code examples
clojurerefs

manipulating an atom containing a ref collection in clojure


I have an application that is supposed to book flights for customers within their specified budget. As such I have customer data and available flights data. I then develop the solutions in Clojure as follows.

First, I create a flights atom:

(def flights
   (atom []))

I then create a function to initialize flights into an atom containing a collection of refs. Here I pass flights data which is included further down this post.

(defn initialize-flights [initial-flights]
   (reset! flights (map ref initial-flights)))

I then process customers through the process-customers function as follows. And this is where it gets really confusing.

(defn process-customers [customers]

(doseq [customer1 (partitionCustomerInput N-THREADS customers)]


  (doseq [customer2  customer1]

    (swap! flights
      (fn [flights_collection]
        (if-let [updated-flight (process-customer flights_collection customer2)]

          (assoc flights (:id updated-flight) updated-flight)
          flights_collection)))))


  (reset! finished-processing? true))

Inside process-customers I pass flights-collection to process-customer (notice process-customer is a helper function for process-customers and they are not the same function). Flights-collection at this point is a collection of flight refs process-customer is supposed to search through the list and in case a customer qualifies for a flight therein, it uses the book function to edit the flight. How should I pass flights-collection to process-customer? As it is, process-customer does not search through the flight refs and it does not alter the flight refs either?

Below is the process-customer function followed by its helper functions.

(defn- process-customer [flights customer]
  "Try to book a flight from `flights` for `customer`, returning the updated
  flight if found, or nil if no suitable flight was found."
  (if-let [{:keys [flight price]} (find-flight flights customer)]
    (let [updated-flight (book flight price (:seats customer))]
      (log "Customer" (:id customer) "booked" (:seats customer)
        "seats on flight" (:id updated-flight) "at $" price " (< budget of $"
        (:budget customer) ").")
      updated-flight)
    (do
      (log "Customer" (:id customer) "did not find a flight.")
      nil)))


(defn filter-pricing-with-n-seats [pricing seats]
  "Get `pricing` for which there are at least `seats` empty seats available."
  (filter #(>= (second %) seats) pricing))

(defn lowest-available-price [flight seats]
  "Returns the lowest price in `flight` for which at least `seats` empty seats
  are available, or nil if none found."
  (-> (:pricing flight)                 ; [[price available taken]]
    (filter-pricing-with-n-seats seats)
    (sort-pricing)
    (first)                             ; [price available taken]
    (first)))                           ; price

(defn- find-flight [flights customer]
  "Find a flight in `flights` that is on the route and within the budget of
  `customer`. If a flight was found, returns {:flight flight :price price},
  else returns nil."
  (let [{:keys [_id from to seats budget]}
          customer
        flights-and-prices
          ; flights that are on the route and within budget, and their price
          (for [f flights
                :when (and (= (:from f) from) (= (:to f) to))
                :let [lowest-price (lowest-available-price f seats)]
                :when (and (some? lowest-price) (<= lowest-price budget))]
            {:flight f :price lowest-price})
        cheapest-flight-and-price
          (first (sort-by :price flights-and-prices))]
    cheapest-flight-and-price))

(defn- book [flight price seats]
  "Updates `flight` to book `seats` at `price`."
  (update flight :pricing
    (fn [pricing]
      (for [[p a t] pricing]
        (if (= p price)
          [p (- a seats) (+ t seats)]
          [p a t])))))



(def finished-processing?
  "Set to true once all customers have been processed, so that sales process
  can end."
  (atom false))

(defn partitionCustomerInput 
  [threads customers]
  (let [partitions (partition-all 
     (Math/ceil (/ (count customers) threads))  customers)]
        partitions))

Below is the main function. It initializes flights and kickstarts customer procecessing

 (defn main []
      (initialize-flights input/flights)
       (let [f1 (future (time (process-customers input/customers)))


        @f1

        )

      (println "Flights:")
      (print-flights (map deref @flights)))
    (main)
    (shutdown-agents)

Below are the customers and flights collection.

(def flights
      [{:id 0
        :from "BRU" :to "ATL"
        :carrier "Delta"
        :pricing [[600 150 0] ; price; # seats available at that price; # seats taken at that price
                  [650  50 0]
                  [700  50 0]
                  [800  50 0]]}
       {:id 1
        :from "BRU" :to "LON"
        :carrier "Brussels Airlines"
        :pricing [[300 150 0]
                  [350  50 0]
                  [370  20 0]
                  [380  30 0]]}
       {:id 2
        :from "BRU" :to "LON"
        :carrier "Brussels Airlines"
        :pricing [[250 100 0]
                  [300  50 0]]}
       {:id 3
        :from "BRU" :to "MAD"
        :carrier "Brussels Airlines"
        :pricing [[200 150 0]
                  [250  50 0]
                  [300 100 0]]}
       {:id 4
        :from "BRU" :to "MAD"
        :carrier "Iberia"
        :pricing [[250 150 0]
                  [300  50 0]]}])

(def customers
  [{:id  0 :from "BRU" :to "ATL" :seats 5 :budget 700}
   {:id  1 :from "BRU" :to "ATL" :seats 5 :budget 550}
   {:id  2 :from "BRU" :to "LON" :seats 6 :budget 270}
   {:id  3 :from "BRU" :to "ATL" :seats 4 :budget 600}
   {:id  4 :from "BRU" :to "LON" :seats 3 :budget 270}
   {:id  5 :from "BRU" :to "LON" :seats 9 :budget 250}
   {:id  6 :from "BRU" :to "MAD" :seats 5 :budget 200}
   {:id  7 :from "BRU" :to "MAD" :seats 9 :budget 150}
   {:id  8 :from "BRU" :to "LON" :seats 5 :budget 250}
   {:id  9 :from "BRU" :to "ATL" :seats 4 :budget 500}
   {:id 10 :from "BRU" :to "MAD" :seats 1 :budget 180}
   {:id 11 :from "BRU" :to "LON" :seats 2 :budget 320}
   {:id 12 :from "BRU" :to "ATL" :seats 3 :budget 850}
   {:id 13 :from "BRU" :to "ATL" :seats 4 :budget 200}])

Also, note that I want to use refs for this implementation to alter the flights as ref offers support for coordinated read and writes to change the flights atomically. I aim to formulate a highly parallelized solution for this application and conflicts cannot be tolerated.


Solution

  • I think you need a ref instead of an atom at the top level. It seems that you will need to coordinate change to individual flight and change to the list of flight. What if one thread is modifying a flight while another thread removes it from the list? Your process-customer side effects must all be done within a (dosync).

    Performance wise, it should be ok, because if you don't modify your list-of-flights ref in transaction, it will not cause other transaction that alters it to retry.

    Another reason is because you are breaking a very important rule for swap!. The function passed to swap! must be free of side effects as it can be retried by the STM. Altering a ref is side effect, and may cause difficult to understand bugs.

    So I would do something like

    (def flights 
      (ref [(ref {:id "flight-1"}) 
            (ref {:id "flight-2"})]))
    
    ;; Run a bunch of these in their own threads...
    (doseq [customer partitioned-customers]
      (dosync (process-customer customer flights)))
    

    Then you can fine tune process-customer with alter, commute and ensure to maximize concurrency and minimize retries.

    Hope this helps and good luck!