Search code examples
javascriptpromisegenerator

How to use a promise-based (non-event-emitter) approach to streaming data?


I want to basically do this when I call a function in my app (frontend):

  1. Upload a file. Update progress percent in UI.
  2. Create job and return "Job started" in UI.
  3. Poll job and wait for it to finish.
  4. Return response (converted file in this case).

The UI will basically go through this sequence:

  1. Uploading... (with percentage circle updating)
  2. Queued...
  3. Processing...
  4. Complete

All from calling one function.

The function will use XMLHttpRequest upload progress functionality, like here. It will then poll on the backend using fetch to get the job status. Finally when the job returns "complete", it will fetch and return the converted file.

What is a proper way of doing that with a promise-based (non event-emitter) approach? Generators?

async function *performUploadJob() {
  const workId = yield await upload(getFile())
  yield { type: 'processing' }
  const output = await pollForJobComplete(workId)
  yield { type: 'result', output }
}

async function pollForJobComplete(workId) {
  while (true) {
    const res = await fetch(`/work/${workId}`)
    const json = await res.json()
    if (json.status === 'complete') {
      return json.output
    }
    await wait(2000)
  }
}

function *upload(file) {
  var fd = new FormData();
  fd.append("file", file);

  var xhr = new XMLHttpRequest();
  xhr.open("POST", "/upload", true);
  xhr.upload.onprogress = function(e) {
    var percentComplete = Math.ceil((e.loaded / e.total) * 100);
    yield { type: 'update', percentComplete }
  };

  xhr.onload = function() {
    if(this.status == 200) {
      yield { type: 'update', percentComplete: 100 }
    }
  }

  xhr.send(fd);
}

Is something like that possible (pseudocode)?

If so, how would you structure it? If not, what would you do instead?

The goal is to be able to just do something like this:

const iterator = performUploadJob()

for (const data of iterator) {
  switch (data.type) {
    ...
  }
}

Solution

  • Yes, this is possible, but I wouldn't recommend it because async iterators are a bad fit for event emitters. Even if you did use AsyncIterator<ProgressEvent, Response, void>, it would be rather uneconomic to use, since with a for await … of loop you don't get the Response result.

    • for pollJob, an async iterator would be ok, since a) you don't care abou the result (it just stops when done) and b) with polling, you never go faster than the consumer. You can implement this with an async generator similar to what you did:

      async function* pollForJob(workId) {
        while (true) {
          const res = await fetch(`/work/${workId}`)
          if (!res.ok) throw new Error('Failed to poll'); // or ignore and carry on
          yield res.json()
          await wait(2000)
        }
      }
      
      …
      for await (const of pollForJob(upload.jobId)) {
        if (json.status === 'complete') {
          break;
        } else if (json.status === 'running') {
          console.log('job continues for', json.estimatedFinish - Date.now());
        }
      }
      console.log('job has finished');
      …
      

      If the final poll actually returns the result of the processing, the picture would look different.

    • for upload, I would recommend to implement a miniature "event loop" where each progress event calls a provided event handler. The event loop terminates when the upload ends, and the promise with the response is resolved (or when there's an error, or if the upload is aborted).

      function upload(file, onProgress, abortSignal) {
        return new Promise((resolve, reject) => {
          abortSignal?.throwIfAborted();
          var fd = new FormData();
          fd.append("file", file);
      
          var xhr = new XMLHttpRequest();
          xhr.open("POST", "/upload", true);
      
          const stop = done => value => {
            abortSignal?.removeEventListener("abort", fail);
            if (xhr.readyState != 4) xhr.abort();
            reject(err);
          }
          const fail = stop(reject);
          abortSignals?.addEventListener("abort", fail);
      
          xhr.ontimeout = fail;
          xhr.onerror = fail;
          xhr.onload = stop(e => resolve(e.target));
      
          xhr.onprogress = e => {
            try {
              onProgress(e);
            } catch(err) {
              fail(err);
            }
          };
      
          xhr.send(fd);
        });
      }
      
      …
      const { status, response } = await upload(file, e => {
        var percentComplete = Math.ceil((e.loaded / e.total) * 100);
        console.log('completion', percentComplete);
      });