Search code examples
clojurestreamjvmcore.async

Streaming data to the caller in JVM


I have a function which gets data periodically and then stops getting the data. This function has to return the data that it is fetching periodically to the caller of the function either

  1. As and when it gets
  2. At one shot

The 2nd one is an easy implementation i.e you block the caller, fetch all the data and then send it in one shot.

But I want to implement the 1st one (I want to avoid having callbacks). Is streams the things to be used here? If so, how? If not, how do I return something on which the caller can query for data and also stop when it returns a signal that there is no more data?

Note: I am on the JVM ecosystem, clojure to be specific. I have had a look at the clojure library core.async which kind of solves this kind of a problem with the use of channels. But I was thinking if there is any other way which probably looks like this (assuming streams is something that can be used).
Java snippet

//Function which will periodically fetch MyData until there is no data
public Stream<MyData> myFunction() {
...
}

myFunction().filter(myData -> myData.text.equals("foo"))


Solution

  • Maybe you can just use seq - which is lazy by default (like Stream) so caller can decide when to pull the data in. And when there are no more data myFunction can simply end the sequence. While doing this, you would also encapsulate some optimisation within myFunction - e.g. to get data in batch to minimise roundtrips. Or fetch data periodically per your original requirement.

    Here is one naive implementation:

    (defn my-function []
      (let [batch 100]
        (->> (range)
             (map #(let [from (* batch %)
                         to   (+ from batch)]
                      (db-get from to)))
             ;; take while we have data from db-get
             (take-while identity)
             ;; returns as one single seq/Stream
             (apply concat))))
    
    ;; use it as a normal seq/Stream
    (->> (my-function)
         (filter odd?))
    

    where db-get would be something like:

    (defn db-get [from to]
      ;; return first 1000 records only, i.e. returns nil to signal completion
      (when (< from 1000)
        ;; returns a range of records
        (range from to)))