Is it possible to create a single TransformStream out of several other TransformStreams using whatwg streams (the web Streams API)?
For example, if I have two TransformStreams which run in sequence, like transformer1
and transformer2
:
readable.pipeThrough(transformer1).pipeThrough(transformer2).pipeTo(writable)
Ultimately, I'd like to be able to convert that to
readable.pipeThrough(allTransformers).pipeTo(writable)
Where allTransformers
is the TransformStream combining transformer1
and transformer2
.
Below is not real functional code, but I'd think there would be a way to do something like this:
const allTransformers = transformer1.pipeThrough(transformer2)
This is clearly a simplified example, but you can imagine there being many transform streams and I'd like to refactor to a single, reusable transform pipeline.
I had the same issue, here are my solutions.
In the example below, UpperCaseTransformStream
pipes the stream through TextDecoderStream
, UpperCaseTextStream
and TextEncoderStream
.
class UpperCaseTextStream extends TransformStream {
constructor() {
super({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
});
}
}
class UpperCaseTransformStream {
constructor(...strategies) {
const { writable, readable } = new TransformStream({}, ...strategies);
this.writable = writable;
this.readable = readable
.pipeThrough(new TextDecoderStream())
.pipeThrough(new UpperCaseTextStream())
.pipeThrough(new TextEncoderStream());
}
}
const TEXT_CHUNK =
"Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy " +
"nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.";
const TEXT_CONTENT = new Array(1024 * 1024).fill(TEXT_CHUNK).join("");
const readableInput = new Blob([TEXT_CONTENT]).stream();
const readableOuput = readableInput.pipeThrough(new UpperCaseTransformStream());
(async () => {
const text = await new Response(readableOuput).text();
console.log("Test OK?", text == TEXT_CONTENT.toUpperCase());
})().catch(console.error);
Here is a more generic implementation using the class PipelineStream
which extends TransformStream
and accepts an array of TransformStream
instances as first parameter.
class UpperCaseTextStream extends TransformStream {
constructor() {
super({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
});
}
}
class PipelineStream extends TransformStream {
constructor(transformStreams, ...strategies) {
super({}, ...strategies);
const readable = [super.readable, ...transformStreams]
.reduce((readable, transform) => readable.pipeThrough(transform));
Object.defineProperty(this, "readable", {
get() {
return readable;
}
});
}
}
const TEXT_CHUNK =
"Lorem ipsum dolor sit amet, consectetuer adipiscing elit, sed diam nonummy " +
"nibh euismod tincidunt ut laoreet dolore magna aliquam erat volutpat.";
const TEXT_CONTENT = new Array(1024 * 1024).fill(TEXT_CHUNK).join("");
const readableInput = new Blob([TEXT_CONTENT]).stream();
const readableOuput = readableInput.pipeThrough(new PipelineStream([
new TextDecoderStream(),
new UpperCaseTextStream(),
new TextEncoderStream()]));
(async () => {
const text = await new Response(readableOuput).text();
console.log("Test OK?", text == TEXT_CONTENT.toUpperCase());
})().catch(console.error);