Search code examples
node.jsamazon-s3aws-lambdaasync-awaitjsonstream

How to stream read an S3 JSON file to postgreSQL using async/await in a NodeJS 12 Lambda function?


I didn't realize how perilous such a simple task could be. We're trying to stream-read a JSON file stored in S3--I think we have that part working. Our .on('data') callback is getting called, but Node picks and chooses what bits it wants to run--seemingly at random.

We set up a stream reader.

stream.on('data', async x => { 
  await saveToDb(x);  // This doesn't await.  It processes saveToDb up until it awaits.
});

Sometimes the db call makes it to the db--but most of the time it doesn't. I've come to the conclusion that EventEmitter has problems with async/await event handlers. It appears as though it will play along with your async method so long as your code is synchronous. But, at the point you await, it randomly decides whether to actually follow through with doing it or not.

It streams the various chunks and we can console.log them out and see the data. But as soon as we try to fire off an await/async call, we stop seeing reliable messages.

I'm running this in AWS Lambda and I've been told that there are special considerations because apparently they halt processing in some cases?

I tried surrounding the await call in an IFFY, but that didn't work, either.

What am I missing? Is there no way of telling JavaScript--"Okay, I need you to run this async task synchronously. I mean it--don't go and fire off any more event notifications, either. Just sit here and wait."?


Solution

  • TL;DR:

    • Use Async Iterators to pull from the end of your stream pipeline!
    • Don't use async functions in any of your stream code!

    Details:

    The secret to life's mystery regarding async/await and streams appears to be wrapped up in Async Iterators!

    In short, I piped some streams together and at the very end, I created an async iterator to pull stuff out of the end so that I could asynchronously call the db. The only thing ChunkStream does for me is to queue up to 1,000 to call the db with instead of for each item. I'm new to queues, so there may already be a better way of doing that.

    // ...
    const AWS = require('aws-sdk');
    const s3 = new AWS.S3();
    const JSONbigint = require('json-bigint');
    JSON.parse = JSONbigint.parse; // Let there be proper bigint handling!
    JSON.stringify = JSONbigint.stringify;
    const stream = require('stream');
    const JSONStream = require('JSONStream');
    
    exports.handler = async (event, context) => {
        // ...
        let bucket, key;
        try {
            bucket = event.Records[0].s3.bucket.name;
            key = event.Records[0].s3.object.key;
            console.log(`Fetching S3 file: Bucket: ${bucket}, Key: ${key}`);
            const parser = JSONStream.parse('*'); // Converts file to JSON objects
            let chunkStream = new ChunkStream(1000); // Give the db a chunk of work instead of one item at a time
            let endStream = s3.getObject({ Bucket: bucket, Key: key }).createReadStream().pipe(parser).pipe(chunkStream);
            
            let totalProcessed = 0;
            async function processChunk(chunk) {
                let chunkString = JSON.stringify(chunk);
                console.log(`Upserting ${chunk.length} items (starting with index ${totalProcessed}) items to the db.`);
                await updateDb(chunkString, pool, 1000); // updateDb and pool are part of missing code
                totalProcessed += chunk.length;
            }
            
            // Async iterator
            for await (const batch of endStream) {
                // console.log(`Processing batch (${batch.length})`, batch);
                await processChunk(batch);
            }
        } catch (ex) {
            context.fail("stream S3 file failed");
            throw ex;
        }
    };
    
    class ChunkStream extends stream.Transform {
        constructor(maxItems, options = {}) {
            options.objectMode = true;
            super(options);
            this.maxItems = maxItems;
            this.batch = [];
        }
        _transform(item, enc, cb) {
            this.batch.push(item);
            if (this.batch.length >= this.maxItems) {
                // console.log(`ChunkStream: Chunk ready (${this.batch.length} items)`);
                this.push(this.batch);
                // console.log('_transform - Restarting the batch');
                this.batch = [];
            }
            cb();
        }
        _flush(cb) {
            // console.log(`ChunkStream: Flushing stream (${this.batch.length} items)`);
            if (this.batch.length > 0) {
                this.push(this.batch);
                this.batch = [];
            }
            cb();
        }
    }