Search code examples
node.jsbacon.jskefir.js

Stream processing in Kefir/Bacon.js


I have been working on a personal project involving real time data using Kefir (or Bacon.js, pick your favorite) and have gotten to a point where I need to log the data in a database to append an id, and then pass the object with the id down the chain. Actually inserting the data into a database (NeDB) is not the issue, but rather its use of callbacks and continued execution while the record is being inserted to the database and how to work around this behavior.

Overly simplified example:

Suppose we have several devices dumping parsed data into a bus/pool:

function Position(data) {
    this.id = null;
    this.longitude = data.longitude;
    this.latitude = data.latitude;
}

self.positionDataPool.map(function(position)) {  // is this even what really needs to be done?
    // unsure what to do here {
        self.db.insert {
            longitude: position.longitude
            , latitude: position.latitude
        }, function(e, newRecord) {
            if(e) { ... }
            , else {
                position.id = newRecord._id;
                return position;
            }
        }
    //}
})
.filter(function(position) {
    // the position without an id is passed here
    ...
});

I suspect this is the incorrect or inappropriate use of the map function, but am out ideas after having tried several things. Any thoughts, suggestions, or help would be greatly appreciated.

My Solution

After doing a ton more reading and experimenting (on top of what I already did) and going back to my days of working with stream processing on a daily basis, I came up with the following solution. While this may not be the most efficient, this solution takes input from multiple sources by plugging several event sources into a data pool (not shown). A completely new stream is created to do a single operation on the object/data. While extensibility was not the objective here, this allows multiple sources to watch for data coming off the stream rather than dumping it straight into the filter. Finally, the data coming from the processed stream is filtered to only show the results we want.

self.savedPositionDataStream = Kefir.stream(function(emitter) {
    self.positionDataPool.onValue(function(val) {
        self.db.insert {
            longitude: position.longitude
            , latitude: position.latitude
        }, function(e, newRecord) {
            val.id = newRecord._id;
            emitter.emit(val);
        }
    });
});

self.filteredPositionData = savedPositionDataStream.filter(...);

Solution

  • At least with Bacon.js, you can use Bacon.fromNodeCallback to wrap the insert call result into a stream. Like

    Bacon.fromNodeCallback(self.db, "insert", dataToBeInserted)
    

    You can of course do this with Bacon.fromBinder or similar Kefir.stream but the fromNodeCallback helper makes this easier as it automatically handles the success/error values and converts them to stream events appropriately.

    And then flatMap instead of map, to do the insertion and provide results as stream:

    let insertionResultE = self.positionDataPool.flatMap(val => 
      Bacon.fromNodeCallback(self.db, "insert", val).map("._id")
    )
    insertionResultE.log("insertion result")
    

    A similar approach applies to Kefir as well. The point is that you cannot do asynchronous and possibly failing computations in map, but you can do that in flapMap.

    Only gotcha here is that you need to add at least one subscriber for insertionResultE to activate it. In the above example, log does it.