Search code examples
javascriptwhatwg-streams-api

How to react to an error on the input (readable) side of a JavaScript TransformStream?


A JavaScript TransformStream implementation retrieves and emits the events that makes up a stream (stream chunks, stream end and stream error) in the following way:

Event How to retrieve How to emit
Stream chunk transform method in TransformStream constructor controller.enqueue()
Stream end flush method in TransformStream constructor controller.terminate()
Stream error ??? controller.error()

The default behaviour for a TransformStream is to forward each of these events from the input (writable) stream to the output (readable) stream, and this behaviour can be customized by using the methods outlined in the table.

I have a special use case where I would like to cause some side effects when an error is forwarded from the input to the output stream. I can also imagine some use cases where the TransformStream does not want to forward the error at all, but react to it in a different way. However, as you can see in the table, I have not been able to find a way to retrieve the error.

How can I react to a stream error in the input (readable) stream of a TransformStream?


Solution

  • It turns out other have noticed this as well and a cancel(reason) hook has been added to the transformer definition in September 2023. However, for backwards compatibility reasons the hook does not allow to transform the cancellation/abortion (such as ignoring it or changing the reason), but only to react to it. The hook is called when the input stream is aborted or the output stream is canceled, except if flush was already called.

    In June 2024, no browsers support this hook yet and thus it is also not documented on MDN yet. You can find links to the implementation issues in the pull request.


    For using this in browsers already today and also support transforming the abortion, I’ve had some success overwriting the writable property of the TransformStream, as a WritableStream allows for an abort implementation.

    class AbortHandlingTransformStream<I, O> extends TransformStream<I, O> {
        constructor(
            transformer?: Transformer<I, O> & {
                abort?: (reason: any, controller: TransformStreamDefaultController<O>) => void | PromiseLike<void>;
            },
            writableStrategy?: QueuingStrategy<I>,
            readableStrategy?: QueuingStrategy<O>
        ) {
            const { abort, start, ...rest } = transformer ?? {};
            let controller: TransformStreamDefaultController<O>;
            super({
                ...rest,
                start: (c) => {
                    controller = c;
                    start?.(c);
                }
            }, writableStrategy, readableStrategy);
    
            const writer = this.writable.getWriter();
            const writable = new WritableStream({
                write: (chunk) => writer.write(chunk),
                close: () => writer.close(),
                abort: async (reason) => {
                    if (abort) {
                        try {
                            await abort(reason, controller);
                        } catch (err: any) {
                            await writer.abort(err);
                        }
                    } else {
                        await writer.abort(reason);
                    }
                }
            });
    
            Object.defineProperty(this, "writable", {
                get: () => writable
            });
        }
    }
    

    It can be used like a regular TransformStream, but an additional abort(reason, controller) hook can be passed to it. This hook is called when the input (writable) stream is aborted. If not specified, it simply aborts output (readable) stream with the provided reason, like regular TransformStreams do. If specified, it can implement custom behaviour, such as mapping the reason to another reason, closing the output stream instead of aborting it, and/or causing side effects. Some things to keep in mind:

    • The abort hook is called only when input (writable) stream is aborted. It is not called when the output (readable) stream is aborted due to an error thrown by one of TransformStream’s functions (such as transform). If you want to handle such errors, put a separate AbortHandlingTransformStream after your other TransformStream in the pipeThrough chain. It is also not called when the output (readable) stream is cancelled by the consumer. To handle that case, you would have to also override the readable property with a wrapper for the readable stream that transforms cancellations.
    • If the abort hook itself throws an error (sync or async), the output (readable) stream is aborted with the error as the reason (this is consistent with errors thrown in other TransformStream functions).
    • Once the input (writable) stream is aborted, the transform and flush hooks will never be called again. This means that the abort hook must either call controller.terminate() or controller.error(reason), otherwise the output stream would just remain open forever.