Search code examples
javascriptnode.jsrabbitmqnode-amqplib

Await for proper connection to rabbit in nodejs


I try to write my simple eventemitter wrapper for amqplib/callback_api. I have trouble with handling sitaution when rabbit is not available or disconnected. I have method getConnect which returns Promise, which resolves when connection established. But if connection is refused Promise obviously rejects. How to force this method do reconnection while connection will not wstablished

/**
     * Async method getConnect for connection
     * @returns {Promise<*>}
     */
    getConnect = async () => {
        return new Promise((resolve, reject) => {
            amqp.connect(this.config.url, async function(err, conn) {
                    if (err) {
                        reject(err);
                    }
                    resolve(conn);
            })
        })
    };

Whole code is here https://github.com/kimonniez/rabbitEE

Maybe, I'm already very sleepy, but I'm completely confused :) Thanks in advance!


Solution

  • Wrap your Promise inside an Observable

    Promise is not built to handle "retry" logic. If you want to do that, you should look into Observables using the rxjs library. This will allow you to retry using an arbitrary time interval while catching errors.

    const { from, interval, of } = rxjs;
    const { catchError, mergeMap, tap, skipWhile, take } = rxjs.operators;
    
    const THRESHOLD = 3;
    const RETRY_INTERVAL = 1000;
    
    // Equivalent to 'amqp.connect'
    const functionThatThrows = number =>
      number < THRESHOLD
        ? Promise.reject(new Error("ERROR"))
        : Promise.resolve("OK");
    
    // Equivalent to `getConnect`
    const getConnect = () =>
      interval(RETRY_INTERVAL)
        .pipe(
          mergeMap(x => from(functionThatThrows(x)).pipe(catchError(e => of(e)))),
          skipWhile(x => {
            const isError = x instanceof Error;
            if (isError) console.log('Found error. Retrying...');
            return isError;
          }),
          take(1)
        ).toPromise();
    
    // Resolve only if the inner Promise is resolved
    getConnect().then(console.log);
    <script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/6.4.0/rxjs.umd.min.js"></script>

    Explanation

    1. Create a source with an interval of 1000. Meaning that it will retry each second
    2. Call your amqp.connect which is equivalent to functionThatThrows in my example
    3. Catch the error using the catchError operator and return it
    4. Skip while the returned object is an error. This will allow your to resolve only if your Promise has been resolved and not rejected
    5. Take the first resolved result using take(1)
    6. Convert your observable into a promise using the toPromise utility function
    7. Call your function and attach then like you do with a standard Promise