Search code examples
scalascalaz-stream

How to implement receiveAvailable transducer in scalaz-stream


Short Version:

I would like to implement a function that returns a transducer that waits for a block of values to be "emitted".

The function I have in mind would have the following signature:

/**
 * The `Process1` which awaits the next "effect" to occur and passes all values emitted by
 * this effect to `rcv` to determine the next state.
 */
def receiveBlock[I, O](rcv: Vector[I] => Process1[I,O]): Process1[I,O] = ???

Details:

My understanding is that I could then use this function to implement the following function which I think would be quite useful:

/**
  * Groups inputs into chunks of dynamic size based on the various effects
  * that back emitted values.
  *
  * @example {{{
  * val numberTask = Task.delay(1)
  * val listOfNumbersTask = Task.delay(List(5,6,7))
  * val sample = Process.eval(numberTask) ++ Process(2,3,4) ++ Process.await(listOfNumbersTask)(xs => Process.emitAll(xs))
  * sample.chunkByEffect.runLog.run should be List(Vector(1), Vector(2,3,4), Vector(5,6,7))
  * }}}
  */
  def chunkByEffect[I]: Process1[I, Vector[I]] = {
    receiveBlock(vec => emit(vec) ++ chunkByEffect)
  }

[Update] More Details

My ultimate objective (slightly simplified) is to implement the following function:

/**
 * Transforms a stream of audio into a stream of text.
 */
voiceRecognition(audio: Process[Task, Byte]): Process[Task, String]

The function makes an external call to a voice recognition service. Thus it is unreasonable to make a network call for every single Byte in the stream. I need to chunk bytes together before making a network call. I could make audio a Process[Task, ByteVector] but that would require testing code to know the maximum chunk size that the function supports, I would rather that be managed by the function itself. Also, when this service is being used inside of a service, the service will be itself receiving network calls with a given size of audio, I would like for the chunkXXX function to be smart about chunking so that it does not hold onto data that is already available.

Basically, the stream of audio coming from the network will have the form Process[Task, ByteVector] and will be translated into a Process[Task, Byte] by flatMap(Process.emitAll(_)). However, the test code will directly produce a Process[Task, Byte] and feed that into voiceRecognition. In theory, I believe it should be possible given the appropriate combinator to provide an implementation of voiceRecognition that does the right thing with both these streams and I think the chunkByEffect function described above is the key to that. I realize now that I would need the chunkByEffect function to have min and max parameter that specifies the minimum and maximum size of chunking irrespective of the underlying Task producing the bytes.


Solution

  • I guess the answer at this point is that this is really hard or impossible to accomplish in scalaz-stream proper. The new version of this library is called fs2 and it has first-class support for "chunking" which is basically what I was looking for here.