I'm using node-oracledb to connect to an Oracle database. The API provides its own promises, that can be casted to Promise<T>
and therefore "converted" to Observable<T>
.
Using Observables, I would like to:
Using the traditional, blocking, procedural way, it would be something like this:
try
{
connection = Oracle.getConnection(...);
resultSet = connection.execute("SELECT ... FROM ...");
}
catch (Exception)
{
resultSet = EMPTY_RESULT;
}
finally
{
if (connection)
connection.close();
}
My attempts at writing this using Observables lead to a lot of code and callbacks.
The protected method getConnection() is still pretty simple:
import * as Oracle from "oracledb";
protected getConnection() : Observable<IConnection>
{
return OraUtil.from(Oracle.getConnection(this.parameters));
}
And so is the closeConnection()
method. I used the promise directly here, to avoid even more code.
protected closeConnection(subscriber : Subscriber<IExecuteReturn>, connection : IConnection) : void
{
connection.close()
.then(() => subscriber.complete())
.catch((error) => subscriber.error());
}
But the execute()
method is where the trouble starts.
protected _execute(connection : IConnection, statement : string) : Observable<IExecuteReturn>
{
return new Observable<IExecuteReturn>(
(subscriber) => {
OraUtil.from(connection.execute(statement)).subscribe(
(result) => subscriber.next(result),
(error) => {
subscriber.error(error);
this.closeConnection(subscriber, connection);
},
() => {
this.closeConnection(subscriber, connection);
});
});
}
public execute(statement : string) : Observable<IExecuteReturn>
{
return this.getConnection().pipe(
flatMap((connection) => this._execute(connection, statement))
);
}
This is how I generally handle connection management. The core is using
observable creator that accepts resource factory as first argument and setup function as second.
using(() => { unsubscribe() }, resource => observableOf(resource))
The resource
is an object with unsubscribe
method that gets called as part of unsubscription - so you can hide any logic there and effectively bind lifecycle of arbitrary object to lifecycle of an observable.
I hope the code below makes sense.
import * as Oracle from "oracledb";
import { mergeMap , ignoreElements} from 'rxjs/operators';
import { using } from 'rxjs/observable/using';
import { from as observableFrom } from 'rxjs/observable/from';
import { concat } from 'rxjs/observable/concat';
import { defer } from 'rxjs/observable/defer';
import { empty as observableEmpty } from 'rxjs/observable/empty';
class OracleConnection {
constructor(parameters) {
this.isClosed = false;
this.connection = null;
this.parameters = parameters;
}
setup() {
return defer(() => Oracle.getConnection(this.parameters)
.then(connection => { // do this in promise in case observable gets unsubscribed before connection is established
this.connection = connection;
if (this.isClosed) { // close in case connection got already closed before even got established
this.terminate();
}
return connection;
}));
}
close() {
this.isClosed = true;
if (this.connection !== null) {
const connection = this.connection;
this.connection = null;
return observableFrom(connection.close())
.pipe(ignoreElements()) // only propagate errors
}
return observableEmpty(); // connection already closed
}
terminate() {
this.close().subscribe(/* handle error from connection close */);
}
unsubscribe() { // this will get called on observable unsubscribe
if (!this.isClosed) {
this.terminate();
}
}
}
class ConnectionManager {
constructor(params) {
this.params = params;
}
getConnection() {
return using(() => new OracleConnection(this.params), oracleConnection => oracleConnection.setup())
}
}
const manager = new ConnectionManager({ /* some params */ });
manager.getConnection()
.pipe(
mergeMap(connection => concat(
connection.execute('SELECT 1'),
connection.close() // explicitly close connection
)),
// alternatively
// take(1) // to close connection automatically
);
Cool thing you can do for example is easily retry the connection in case of failure:
oracle.getConnection()
.pipe(
retry(3)
...
);