Search code examples
javascriptpromisereducecancellationasync-iterator

Is this Promise cancellation implementation for reducing an async iterable on the right track?


I'd like to enable Promise cancellation for one of the methods of my library, reduce. I'm only interested in cancelling the Promise for an async iterable, since these have a high likelihood to hang indefinitely.

const reduceAsyncIterable = async (fn, possiblyX0, state, x) => {
  const iter = x[Symbol.asyncIterator]()
  const y0 = isUndefined(possiblyX0) ? (await iter.next()).value : possiblyX0
  if (isUndefined(y0)) {
    throw new TypeError('reduce(...)(x); x cannot be empty')
  }
  let y = await fn(y0, (await iter.next()).value)
  for await (const xi of iter) {
    if (state.cancelled) return // stops async iterating if `cancel` called
    y = await fn(y, xi)
  }
  return y
}

const reduce = (fn, x0) => {
  if (!isFunction(fn)) {
    throw new TypeError('reduce(x, y); x is not a function')
  }
  return x => {
    if (isIterable(x)) return reduceIterable(fn, x0, x)
    if (isAsyncIterable(x)) {
      const state = { cancelled: false, resolve: () => {} }
      const p = new Promise((resolve, reject) => {
        state.resolve = resolve
        reduceAsyncIterable(fn, x0, state, x).then(
          y => state.cancelled || resolve(y)
        ).catch(reject)
      })
      p.cancel = () => { state.cancelled = true; state.resolve() } // shortcircuit the Promise `p` on `cancel` call
      return p
    }
    if (is(Object)(x)) return reduceObject(fn, x0, x)
    throw new TypeError('reduce(...)(x); x invalid')
  }
}

The above code seems to work, but I can't help but feel like there are memory leaks here. In particular, at await iter.next() and for await (const xi of iter). If these await statements take forever (which they likely could for an async iterator), reduceAsyncIterable could possibly never return. This is fine from the user's perspective because of the shortcircuit happening in reduce, as the Promise the user sees is resolved. But from the computer's perspective, would cancelling the Promise of this operation cause a memory leak?

I'd like to be able to use the cancel function on a returned promise like so:

const myOngoingTaskPromise = reduce(someReducer, null)(myInfiniteAsyncIterable)

myOngoingTaskPromise.cancel() // resolves myOngoingTaskPromise with undefined

myOngoingTaskPromise // Promise { undefined }

Solution

  • I found the way, Promise.race is like a secret weapon or something

        if (isAsyncIterable(x)) {
          const state = { cancel: () => {} }
          const cancelToken = new Promise((_, reject) => { state.cancel = reject })
          const p = Promise.race([
            reduceAsyncIterable(fn, x0, x),
            cancelToken,
          ])
          p.cancel = () => { state.cancel(new Error('cancelled')) }
          return p
        }
    

    No memory leaks

    reduce-async-iterable-promise-cancellation