Search code examples
node.jshighland.js

Circular data flow in highlandjs


I'm just learning highland.js after being inspired by NoFlo.js. I want to be able to have streams operate recursively. In this contrived example I will provide a number that get's multiplied by two and we filter results <= 512. Once the number is multiplied it gets fed back into the system. The code I have works but if I take out the doto function in the pipeline it doesn't process any numbers. I suspect that I'm sending the data back into the returnPipe incorrectly. Is there a better way to pipe data back into a system? What am I missing?

###
  input>--m--->multiplyBy2>---+
          |                   |
          |                   |
          +---<returnPipe<----+
###

H = require('highland')

input = H([1])
returnPipe = H.pipeline(
  H.doto((v)->console.log(v))
)
H.merge([input,returnPipe])
 .map((v)-> return v * 2)
 .filter((v)-> return v <= 512)
 .pipe(returnPipe)

Solution

  • From the documentation: doto spins off a stream while re-emitting the source stream. This means that as far as the pipeline is concerned, there is a function that is still passing the stream through it. If you take doto out, the original stream doesn't make it back through return stream on the next iteration.

    If you are going to use pipeline, you have to pass it a method that takes a stream and emits a stream. For example, you could replace the doto method with something like H.map((v)=>{console.log(v); return v;}) in the call to H.pipeline and since that method consumes a stream and emits a stream, it will continue to flow when the stream is passed back into it on .pipe(returnPipe)

    EDIT: To answer your question, when you declare let input = H([1]) you are actually creating a stream right there. You can remove any reference to the pipeline and returnPipe and produce the same output with the following code:

    let input = H([1]);
    
    input.map((v)=> {
      return v * 2;
    })
    .filter((v)=> {
      if (v <= 512) {
        console.log(v);
      }
      return v <= 512;
    })
    .pipe(input);