Search code examples
node.jstypescriptrxjsobservablereactivex

How do I implement a database connection/select/close using RxJS observables


I'm using node-oracledb to connect to an Oracle database. The API provides its own promises, that can be casted to Promise<T> and therefore "converted" to Observable<T>.

Using Observables, I would like to:

  1. Open the database connection
  2. Select N records
  3. Close the database connection, even if #2 threw an exception.

Using the traditional, blocking, procedural way, it would be something like this:

try
{
    connection = Oracle.getConnection(...);
    resultSet = connection.execute("SELECT ... FROM ...");
}
catch (Exception)
{
    resultSet = EMPTY_RESULT;
}
finally
{
    if (connection)
        connection.close();
}

My attempts at writing this using Observables lead to a lot of code and callbacks.

The protected method getConnection() is still pretty simple:

import * as Oracle from "oracledb";

protected getConnection() : Observable<IConnection>
{
    return OraUtil.from(Oracle.getConnection(this.parameters));
}

And so is the closeConnection() method. I used the promise directly here, to avoid even more code.

protected closeConnection(subscriber : Subscriber<IExecuteReturn>, connection : IConnection) : void
{
    connection.close()
        .then(() => subscriber.complete())
        .catch((error) => subscriber.error());
}

But the execute() method is where the trouble starts.

protected _execute(connection : IConnection, statement : string) : Observable<IExecuteReturn>
{
    return new Observable<IExecuteReturn>(
        (subscriber) => {
            OraUtil.from(connection.execute(statement)).subscribe(
                (result) => subscriber.next(result),
                (error) => {
                    subscriber.error(error);
                    this.closeConnection(subscriber, connection);
                },
                () => {
                    this.closeConnection(subscriber, connection);
                });
        });
}

public execute(statement : string) : Observable<IExecuteReturn>
{
    return this.getConnection().pipe(
        flatMap((connection) => this._execute(connection, statement))
    );
}

Solution

  • This is how I generally handle connection management. The core is using observable creator that accepts resource factory as first argument and setup function as second.

    using(() => { unsubscribe() }, resource => observableOf(resource))
    

    The resource is an object with unsubscribe method that gets called as part of unsubscription - so you can hide any logic there and effectively bind lifecycle of arbitrary object to lifecycle of an observable.

    I hope the code below makes sense.

    import * as Oracle from "oracledb";
    import { mergeMap , ignoreElements} from 'rxjs/operators';
    import { using } from 'rxjs/observable/using';
    import { from as observableFrom } from 'rxjs/observable/from';
    import { concat } from 'rxjs/observable/concat';
    import { defer } from 'rxjs/observable/defer';
    import { empty as observableEmpty } from 'rxjs/observable/empty';
    
    class OracleConnection {
      constructor(parameters) {
        this.isClosed = false;
        this.connection = null;
        this.parameters = parameters;
      }
    
      setup() {
        return defer(() => Oracle.getConnection(this.parameters)
          .then(connection => { // do this in promise in case observable gets unsubscribed before connection is established
            this.connection = connection;
            if (this.isClosed) { // close in case connection got already closed before even got established
              this.terminate();
            }
            return connection;
          }));
      }
    
      close() {
        this.isClosed = true;
        if (this.connection !== null) {
          const connection = this.connection;
          this.connection = null;
    
          return observableFrom(connection.close())
            .pipe(ignoreElements()) // only propagate errors
        }
    
        return observableEmpty(); // connection already closed
      }
      
      terminate() {
        this.close().subscribe(/* handle error from connection close */);
      }
    
      unsubscribe() { // this will get called on observable unsubscribe
        if (!this.isClosed) {
          this.terminate();
        }
      }
    }
    
    class ConnectionManager {
      constructor(params) {
        this.params = params;
      }
    
      getConnection() {
        return using(() => new OracleConnection(this.params), oracleConnection => oracleConnection.setup())
      }
    }
    
    const manager = new ConnectionManager({ /* some params */ });
    
    manager.getConnection()
      .pipe(
        mergeMap(connection => concat(
          connection.execute('SELECT 1'),
          connection.close() // explicitly close connection
        )),
        // alternatively
        // take(1) // to close connection automatically
      );

    Cool thing you can do for example is easily retry the connection in case of failure:

    oracle.getConnection()
      .pipe(
        retry(3)
        ...
      );