Search code examples
node.jsrxjsrxjs6

RxJS: How may I treat bindNodeCallback() as an Observable that can have operators chained to it?


For RxJS learning purposes, I'm building a small node.js app that puts everything in a stream where possible.

As a first step, I'm working to load an external configuration yaml file. I've seen examples where fs.readFile is wrapped in a bindNodeCallback, which returns an observable when called. However, the readFile variable set below is still a typeof of 'function'. As a result, I'm unable to chain flatMap to it.

How may I reorganize the code below to chain bindNodeCallback() and yaml.safeLoad() (which doesn't offer a callback) before initializing my app?

const yaml = require('js-yaml');
const fs = require('fs');
const { bindNodeCallback, of } = require('rxjs');
const { mergeMap, map } = require('rxjs/operators');

const configPath = './config/config.yml';
const configEncoding = 'utf8';

const readFile = bindNodeCallback(fs.readFile);

readFile(configPath, configEncoding)
.flatMap(yamlString => of(yaml.safeLoad(yamlString))) // error: .flatMap is not a funciton
.subscribe(
    config => {
        console.log(config);
        // launch the app when config is loaded
    }, 
    err => console.error(err)
);

Solution

  • readFile should still be a function, and it does return an Observable when executed, but flatMap is not a method on the Observable that's returned. If you get rid of the call to .flatMap() and jump straight to .subscribe(), that works.

    Were you trying to use flatMap from rxjs/operators, or the chaining interface?


    This works, but I had to yarn add rxjs-compat for some reason. Also note that I swapped to using Observable#pipe() with flatMap as a pipeable operator.

    const fs = require('fs')
    const yaml = require('js-yaml')
    const { bindNodeCallback } = require('rxjs')
    const { flatMap } = require('rxjs/operators')
    const { of } = require('rxjs/observable/of')
    
    const configPath = './config.yml'
    const configEncoding = 'utf8'
    
    const readFile = bindNodeCallback(fs.readFile)
    
    readFile(configPath, configEncoding)
      .pipe(flatMap(yamlString => of(yaml.safeLoad(yamlString))))
      .subscribe(
        config => {
          console.log(config)
        },
        err => console.error(err),
      )