Search code examples
node.jsamazon-web-servicespromiseamazon-kinesis-firehose

Unable to resolve a promise with setTimeout


I'm writing a function to write to AWS Kinesis firehose using Node. The AWS function call which writes to firehose stream returns "error" or "data" depending on the result of the operation. Incase of error, for a specific error code, I need to retry same request with exponentialBackoff. I'm using setTimeOut to fire the same method with varying time for each subsequent retry but looks like whenever there is a retry, I'm not resolving properly and my test are failing complaining "Error: Timeout of 5000ms exceeded. For async tests and hooks, ensure "done()" is called; if returning a Promise, ensure it resolves."

async function batchWrite(records,firehose,retry = 0){
    var readingObjects = getReadings(records);
    var params = {
        DeliveryStreamName: process.env.KINESIS_FIREHOSE_STREAM_DELIVERY,
        Records: readingObjects
    };

    return await new Promise(function(resolve,reject){
        firehose.putRecordBatch(params,function(error,data){
            if(error){
                if(error.code == 'ServiceUnavailableException' && retry < retries.length){
                    console.log('retryCount=',retry);
                    setTimeout(batchWrite,retries[retry],records,firehose,retry+1);
                    console.log('setTimeout',retry);
                }
                else{
                    // console.log('Error',error);
                    console.log('resolving');
                    resolve(error);
                }
            }
            else{
                if(data.FailedPutCount > 0){
                    //colect the RequestResponses which are not processed.
                    //index of those records is same as in request
                    //process those faied records again
                    console.log('Error',data);
                }
                resolve(data);
            }
        });
    });
}

for the function above, In test case,I'm simulating error case by returning response which has ServiceUnavailableException so that it retries but after all the retries, test timeout. Is there anything I need to change to make it work.


Solution

  • When you call setTimeout() for the retry, the original promise you created and returned is never resolved. So, the caller never sees a resolved promise. You will need to chain the new promise to the prior one. And, you can't do that with setTimeout() directly, but you can do that my wrapping it in a promise of its own and then chaining that to the original promise. In addition, the code is a ton more managable if you promisify putRecordBatch as the lowest level and do all your control flow with promises, not a mix of promises and callbacks.

    Here's how you could do that:

    // utility function to return a promise that is resolved after a setTimeout()
    function delay(t, v) {
        return new Promise(resolve => {
            setTimeout(resolve, t, v);
        });
    }
    
    const promisify = require('util').promisify;
    
    function batchWrite(records,firehose,retry = 0){
        var readingObjects = getReadings(records);
        var params = {
            DeliveryStreamName: process.env.KINESIS_FIREHOSE_STREAM_DELIVERY,
            Records: readingObjects
        };
    
        // make a promisified version of firehose.putRecordBatch
        firehose.putRecordBatchP = promisify(firehose.putRecordBatch);
    
        return firehose.putRecordBatchP(params).then(data => {
            if(data.FailedPutCount > 0){
                //collect the RequestResponses which are not processed.
                //index of those records is same as in request
                //process those failed records again
                console.log('Error',data);
            }
            return data;
    
        }).catch(error => {
            if(error.code == 'ServiceUnavailableException' && retry < retries.length){
                console.log('retryCount=',retry);
    
                // CHAIN the promise delay and retry here
                return delay(retries[retry]).then(() => {
                    console.log('retry after delay #',retry);
                    return batchWrite(records,firehose,retry+1); 
                });
            } else {
                // REJECT promise here upon error
                console.log('Error, rejecting promise');
                throw error;
            }
    
        });
    }
    

    A couple things about your code I didn't understand:

    1. When you get a non-retryable error from putRecordBatch() it seems you should reject the promise so that caller knows the operation failed. I changed the code to do that.

    2. It isn't clear to me what the if(data.FailedPutCount > 0){...} clause is for or why all you do is console.log() in it.