Search code examples
javascriptwhatwg-streams-api

Using Web Streams, create a TransformStream from several TransformStreams


Is it possible to create a single TransformStream out of several other TransformStreams using whatwg streams (the web Streams API)?

For example, if I have two TransformStreams which run in sequence, like transformer1 and transformer2:

readable.pipeThrough(transformer1).pipeThrough(transformer2).pipeTo(writable)

Ultimately, I'd like to be able to convert that to

readable.pipeThrough(allTransformers).pipeTo(writable)

Where allTransformers is the TransformStream combining transformer1 and transformer2.

Below is not real functional code, but I'd think there would be a way to do something like this:

const allTransformers = transformer1.pipeThrough(transformer2)

This is clearly a simplified example, but you can imagine there being many transform streams and I'd like to refactor to a single, reusable transform pipeline.


Solution

  • I had the same issue, here are my solutions.

    In the example below, UpperCaseTransformStream pipes the stream through TextDecoderStream, UpperCaseTextStream and TextEncoderStream.

    class UpperCaseTextStream extends TransformStream {
      constructor() {
        super({
          transform(chunk, controller) {
            controller.enqueue(chunk.toUpperCase());
          }
        });
      }
    }
    
    class UpperCaseTransformStream {
      constructor(...strategies) {
        const { writable, readable } = new TransformStream({}, ...strategies);
        this.writable = writable;
        this.readable = readable
          .pipeThrough(new TextDecoderStream())
          .pipeThrough(new UpperCaseTextStream())
          .pipeThrough(new TextEncoderStream());
      }
    }
    
    const TEXT_CHUNK =
     "Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy " +
     "nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.";
    const TEXT_CONTENT = new Array(1024 * 1024).fill(TEXT_CHUNK).join("");
    const readableInput = new Blob([TEXT_CONTENT]).stream();
    const readableOuput = readableInput.pipeThrough(new UpperCaseTransformStream());  
    (async () => {
      const text = await new Response(readableOuput).text();
      console.log("Test OK?", text == TEXT_CONTENT.toUpperCase());
    })().catch(console.error);

    Here is a more generic implementation using the class PipelineStream which extends TransformStream and accepts an array of TransformStream instances as first parameter.

    class UpperCaseTextStream extends TransformStream {
      constructor() {
        super({
          transform(chunk, controller) {
            controller.enqueue(chunk.toUpperCase());
          }
        });
      }
    }
    
    class PipelineStream extends TransformStream {
      constructor(transformStreams, ...strategies) {
        super({}, ...strategies);
    
        const readable = [super.readable, ...transformStreams]
          .reduce((readable, transform) => readable.pipeThrough(transform));
    
        Object.defineProperty(this, "readable", {
          get() {
            return readable;
          }
        });
      }
    }
    
    const TEXT_CHUNK =
     "Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy " +
     "nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.";
    const TEXT_CONTENT = new Array(1024 * 1024).fill(TEXT_CHUNK).join("");
    const readableInput = new Blob([TEXT_CONTENT]).stream();
    const readableOuput = readableInput.pipeThrough(new PipelineStream([
      new TextDecoderStream(),
      new UpperCaseTextStream(),
      new TextEncoderStream()]));
    (async () => {
      const text = await new Response(readableOuput).text();
      console.log("Test OK?", text == TEXT_CONTENT.toUpperCase());
    })().catch(console.error);