A JavaScript TransformStream implementation retrieves and emits the events that makes up a stream (stream chunks, stream end and stream error) in the following way:
Event | How to retrieve | How to emit |
---|---|---|
Stream chunk | transform method in TransformStream constructor |
controller.enqueue() |
Stream end | flush method in TransformStream constructor |
controller.terminate() |
Stream error | ??? | controller.error() |
The default behaviour for a TransformStream is to forward each of these events from the input (writable) stream to the output (readable) stream, and this behaviour can be customized by using the methods outlined in the table.
I have a special use case where I would like to cause some side effects when an error is forwarded from the input to the output stream. I can also imagine some use cases where the TransformStream does not want to forward the error at all, but react to it in a different way. However, as you can see in the table, I have not been able to find a way to retrieve the error.
How can I react to a stream error in the input (readable) stream of a TransformStream?
It turns out other have noticed this as well and a cancel(reason)
hook has been added to the transformer definition in September 2023. However, for backwards compatibility reasons the hook does not allow to transform the cancellation/abortion (such as ignoring it or changing the reason), but only to react to it. The hook is called when the input stream is aborted or the output stream is canceled, except if flush
was already called.
In June 2024, no browsers support this hook yet and thus it is also not documented on MDN yet. You can find links to the implementation issues in the pull request.
For using this in browsers already today and also support transforming the abortion, I’ve had some success overwriting the writable
property of the TransformStream
, as a WritableStream
allows for an abort
implementation.
class AbortHandlingTransformStream<I, O> extends TransformStream<I, O> {
constructor(
transformer?: Transformer<I, O> & {
abort?: (reason: any, controller: TransformStreamDefaultController<O>) => void | PromiseLike<void>;
},
writableStrategy?: QueuingStrategy<I>,
readableStrategy?: QueuingStrategy<O>
) {
const { abort, start, ...rest } = transformer ?? {};
let controller: TransformStreamDefaultController<O>;
super({
...rest,
start: (c) => {
controller = c;
start?.(c);
}
}, writableStrategy, readableStrategy);
const writer = this.writable.getWriter();
const writable = new WritableStream({
write: (chunk) => writer.write(chunk),
close: () => writer.close(),
abort: async (reason) => {
if (abort) {
try {
await abort(reason, controller);
} catch (err: any) {
await writer.abort(err);
}
} else {
await writer.abort(reason);
}
}
});
Object.defineProperty(this, "writable", {
get: () => writable
});
}
}
It can be used like a regular TransformStream
, but an additional abort(reason, controller)
hook can be passed to it. This hook is called when the input (writable) stream is aborted. If not specified, it simply aborts output (readable) stream with the provided reason, like regular TransformStream
s do. If specified, it can implement custom behaviour, such as mapping the reason to another reason, closing the output stream instead of aborting it, and/or causing side effects. Some things to keep in mind:
abort
hook is called only when input (writable) stream is aborted. It is not called when the output (readable) stream is aborted due to an error thrown by one of TransformStream
’s functions (such as transform
). If you want to handle such errors, put a separate AbortHandlingTransformStream
after your other TransformStream
in the pipeThrough
chain. It is also not called when the output (readable) stream is cancelled by the consumer. To handle that case, you would have to also override the readable
property with a wrapper for the readable stream that transforms cancellations.abort
hook itself throws an error (sync or async), the output (readable) stream is aborted with the error as the reason (this is consistent with errors thrown in other TransformStream
functions).transform
and flush
hooks will never be called again. This means that the abort
hook must either call controller.terminate()
or controller.error(reason)
, otherwise the output stream would just remain open forever.