Search code examples
kubernetesclojurelazy-evaluationserver-sent-eventshorizontal-scaling

stateful service with lazy sequences and SSE -- how to distribute with fault tolerance?


I wrote a web service to generate estimates of Pi, using lazy sequences in Clojure and various infinite series formulae (Euler, Leibniz). The Clojure service sends these estimates over a Server-Sent Events channel. Currently a HTML/JS view is using Vue.js to consume the SSE events and display them.

It works pretty well as a service with a single node, as long as the connection for the SSE channel isn't closed. But as of now it doesn't persist or back up the state of the reductions (the position in the infinite series) to recover from a failure if the connection is closed or the service dies. Also, since the state is contained in local memory in the service (in the Clojure sequence value), there is no horizontal scalability, as there would be if the long-term memory state lived in Redis for example. In this case, just adding new nodes won't offer a way to actually divide the work -- it would just duplicate the same series. Using Redis to offload long-term memory state is the kind of setup I'm used to with stateless web services, to streamline a horizontal-scaling and fault tolerance strategy.

In this stateful case, I'm kind of at a loss as to how to scale the Clojure service with a distributed, multi-node solution, that could process series terms in parallel. Maybe there could be a dispatching "master" service that delegates sequence ranges to different nodes, receives the results from the nodes concurrently (via Redis pub/sub), aggregates them mathematically and yields a resulting SSE stream for the view? And in that case, the master service would use an infinite series of numbers spaced by about a thousand, to yield the range bounds, which the parallel nodes could use to initialize non-infinite Clojure sequences (likely still lazy)? Surely in this case I'd need to mark which sequence ranges are complete as they come in, with a retry strategy in the case of node failure during the processing of a range.

I am studying Kubernetes Stateful Sets to become familiar with deployment patterns for stateful services, though I haven't yet encountered a pattern or solution that fits this specific problem. Were this a stateless service, the Kubernetes solution would be kind of obvious, but a stateful approach leaves me with a blank slate in the Kubernetes environment.

Can anyone point me in a good direction for the architecture here? Assuming I do want to keep the state of series terms encapsulated in Clojure lazy sequences (that is, in local service memory), am I on the right track in my strategy to divide the work?

Here is the relevant code for the single-node Clojure service:

(ns server-sent-events.service
  (:require [io.pedestal.http :as http]
            [io.pedestal.http.sse :as sse]
            [io.pedestal.http.route :as route]
            [io.pedestal.http.route.definition :refer [defroutes]]
            [ring.util.response :as ring-resp]
            [clojure.core.async :as async]
  )
)

(defn seq-of-terms
   [func]
   (map func (iterate (partial + 1) 0))
)

(defn euler-term [n]
  (let [current (+ n 1)] (/ 6.0 (* current current)))
)

; The following returns a lazy list representing iterable sums that estimate pi
; according to the Euler series for increasing amounts of terms in the series.
; Sample usage: (take 100 euler-reductions)
(def euler-reductions
  (map (fn [sum] (Math/sqrt sum))  (reductions + (seq-of-terms euler-term) ))
)

(defn leibniz-term [n] ; starts at zero
   (let [
          oddnum (+ (* 2.0 n) 1.0)
          signfactor (- 1 (* 2 (mod n 2)))
        ]
        (/ (* 4.0 signfactor) oddnum)
  )
)

; The following returns a lazy list representing iterable sums that estimate pi
; according to the Leibniz series for increasing amounts of terms in the series.
; Sample usage: (take 100 leibniz-reductions)
(def leibniz-reductions (reductions + (seq-of-terms leibniz-term)))

(defn send-result
  [event-ch count-num rdcts]
  (doseq [item rdcts]
    (Thread/sleep 150) ; we must use a naive throttle here to prevent an overflow on the core.async CSP channel, event-ch
    (async/put! event-ch (str item))
  )
)

(defn sse-euler-stream-ready
  "Start to send estimates to the client according to the Euler series"
  [event-ch ctx]
  ;; The context is passed into this function.
  (let
    [
      {:keys [request response-channel]} ctx
      lazy-list euler-reductions
    ]
    (send-result event-ch 10 lazy-list)
  )
)

(defn sse-leibniz-stream-ready
  "Start to send estimates to the client according to the Leibniz series"
  [event-ch ctx]
  (let
    [
      {:keys [request response-channel]} ctx
      lazy-list leibniz-reductions
    ]
    (send-result event-ch 10 lazy-list)
  )
)


;; Wire root URL to sse event stream
;; with custom event-id setting
(defroutes routes
  [[["/" {:get [::send-result-euler (sse/start-event-stream sse-euler-stream-ready)]}
    ["/euler" {:get [::send-result
                    (sse/start-event-stream sse-euler-stream-ready)]}]
    ["/leibniz" {:get [::send-result-leibniz
                      (sse/start-event-stream sse-leibniz-stream-ready)]}]
    ]]])

(def url-for (route/url-for-routes routes))

(def service {:env :prod
              ::http/routes routes
              ;; Root for resource interceptor that is available by default.
              ::http/resource-path "/public"
              ;; Either :jetty or :tomcat (see comments in project.clj
              ;; to enable Tomcat)
              ::http/type :jetty
              ::http/port 8080
              ;;::http/allowed-origins ["http://127.0.0.1:8081"]
              }
)

Full code is at https://github.com/wclark-aburra-code/pi-service. Inline Vue.js code included, which consumes the SSE stream.


Solution

  • If it is just for scaling, I don't think you need to persist anything. All you need is a dispatching "master" (which can potentially be the client itself) to request the chunked sequences from multiple backends and reassemble them to deliver in the right order.

    Using core.async, a dispatching master can be implemented like this:

    (let [batch-ch (async/chan)
          out-ch   (async/chan)]
    
      ;; request for 100 batches (or infinite)
      (async/onto-chan batch-ch (range 100))
      ;; consume the result by pushing it back to the sse channel
      (async/go-loop []
        (when-let [res (async/<! out-ch)]
          (log/info ::result res)
          (recur)))
    
      ;;
      ;; take each batch number from batch-ch and dispatch it to the backend
      ;; in parallel. You would also add an exception handler in here.
      ;;
      (async/pipeline-async
       ;; parallelism
       32
       ;; output
       out-ch
       ;; invoke backend service, this should return immediately
       (fn [batch ch]
         (let [batch-sz 1000]
           (async/go
             (let [start (* batch batch-sz)
                   end   (-> batch inc (* batch-sz))]
               (log/info ::fetching-from-service start end)
               ;; simulate a slow service
               (async/<! (async/timeout 1000))
               ;; push the result back to the pipeline and close the channel
               ;; (here I just return the term itself)
               (async/onto-chan ch (range start end))))))
       ;; input  ;;
       batch-ch))