Search code examples
javascriptbluebirdramda.js

How to use Ramda Pipe function with a mix of promises and static callbacks?


Based on the help of @ScottSauyet I have been able to create a function resolving static and promise based callbacks for an initial data object.

Now I want to be able to pipe this data object through a series of callbacks, but run into trouble once I add multiple promises into the mix.

Current setup

// Libaries
const R = require('ramda');
const fetch = require('node-fetch');
const Promise = require('bluebird');

// Input
const data = {
  array: [['#', 'FirstName', 'LastName'], ['1', 'tim', 'foo'], ['2', 'kim', 'bar']],
  header: 'FirstName',
  more: 'stuff',
  goes: 'here'
};

// Static and Promise Resolver (with Helper Function)
const transposeObj = (obj, len = Object.values(obj)[0].length) =>
  [...Array(len)].map((_, i) => Object.entries(obj).reduce((a, [k, v]) => ({ ...a, [k]: v[i] }), {}));

const mergeCallback = async ({ array: [headers, ...rows], header, ...rest }, callback) => {
  const index = R.indexOf(header, headers);
  const result = await Promise.map(rows, row => {
    return callback(row[index]);
  })
    .then(x => ({ changes: x.map(v => transposeObj(v.changes)) }))
    .then(({ changes }) => ({
      allHeaders: R.flatten([
        ...headers,
        R.chain(t => R.chain(Object.keys, t), [...changes])
          .filter(k => !headers.includes(k))
          .filter((x, i, a) => a.indexOf(x) == i)
      ]),
      changes
    }))
    .then(({ changes, allHeaders }) => ({
      resultRows: R.chain(
        (row, i = R.indexOf(row, [...rows])) =>
          changes[i].map(change =>
            Object.entries(change).reduce(
              (r, [k, v]) => [...r.slice(0, allHeaders.indexOf(k)), v, ...r.slice(allHeaders.indexOf(k) + 1)],
              row.slice(0)
            )
          ),
        [...rows]
      ),
      allHeaders
    }))
    .then(({ resultRows, allHeaders, array }) => ({
      array: [allHeaders, ...resultRows],
      header,
      ...rest
    }));
  return result;
};

// Example Callbacks and their services
const adapterPromise1 = async name => {
  const response = await fetch(`https://api.abalin.net/get/getdate?name=${name}&calendar=us`).then(res => res.json());
  return {
    changes: {
      nameday: R.pluck('day', response.results),
      namemonth: R.pluck('month', response.results)
    }
  };
};
const servicePromise1 = input => mergeCallback(input, adapterPromise1);

const adapterPromise2 = async name => {
  const response = await fetch(`https://api.genderize.io?name=${name}`).then(res => res.json());
  return {
    changes: {
      gender: R.of(response.gender)
    }
  };
};
const servicePromise2 = input => mergeCallback(input, adapterPromise2);

const adapterStatic1 = name => ({ changes: { NameLength: R.of(R.length(name)) } });
const serviceStatic1 = input => mergeCallback(input, adapterStatic1);

Pipe Attempt

const result = R.pipe(
  servicePromise1,
  servicePromise2,
  serviceStatic1
)(data);

// console.log(result); <<< preferred resolution method, but not working due to promise
result.then(console.log);

Expected Result

{ array:  
   [ [ '#', 
       'FirstName', 
       'LastName', 
       'nameday', 
       'namemonth', 
       'gender', 
       'NameLength' ], 
     [ '1', 'tim', 'foo', 24, 1, 'male', 3 ], 
     [ '1', 'tim', 'foo', 20, 6, 'male', 3 ], 
     [ '2', 'kim', 'bar', 8, 9, 'male', 3 ], 
     [ '2', 'kim', 'bar', 11, 10, 'male', 3 ] ], 
  header: 'FirstName', 
  more: 'stuff', 
  goes: 'here' } 

Current result

Pipe works with any one service call, but as soon as I try to use two or more services, I receive the following error.

Cannot read property 'Symbol(Symbol.iterator)' of undefined 

Any hint on how to get this to work would be greatly appreciated.


Solution

  • Ramda's pipe is not Promise-aware. The old Promise-aware version, pipeP is being deprecated in favor of the more generic pipeWith. You can use it with Promises by passing R.then (soon to be renamed to R.andThen) like this:

    R.pipeWith (R.then, [
    //servicePromise1, // problem with CORS headers here.
      servicePromise2,
      serviceStatic1
    ]) (data)
    .then (console .log)
    

    For some reason your first API call is running into CORS issues for me when I try to run it from Ramda's REPL or a SO snippet, but the process should be clear without it.

    This might be enough to fix your problem. It works for this test-case. But I see an outstanding issue: All versions of pipe pass through the result of the previous call to the next one. However you use a property of the data to configure something about how the next callback will be triggered, namely your header property. So that would have to stay fixed throughout your pipeline. It's fine if all calls are going to use the FirstName property, but my impression is that they need their own versions of it.

    But it would be easy enough to write a custom pipeline function that let you pass this alongside the callback function. Then your call might look like this:

    seq ([
      ['FirstName', servicePromise2],
      ['FirstName', serviceStatic1]
    ]) (data)
    .then(console.log)
    

    You can see a working version of that idea in this snippet:

    // Input
    const data = {
      array: [['#', 'FirstName', 'LastName'], ['1', 'tim', 'foo'], ['2', 'kim', 'bar']],
      header: 'FirstName',
      more: 'stuff',
      goes: 'here'
    };
    
    // Static and Promise Resolver (with Helper Function)
    const transposeObj = (obj, len = Object.values(obj)[0].length) =>
      [...Array(len)].map((_, i) => Object.entries(obj).reduce((a, [k, v]) => ({ ...a, [k]: v[i] }), {}));
    
    const mergeCallback = async ({ array: [headers, ...rows], header, ...rest }, callback) => {
      const index = R.indexOf(header, headers);
      const result = await Promise.all(rows.map(row => {
        return callback(row[index]);
      }))
        .then(x => ({ changes: x.map(v => transposeObj(v.changes)) }))
        .then(({ changes }) => ({
          allHeaders: R.flatten([
            ...headers,
            R.chain(t => R.chain(Object.keys, t), [...changes])
              .filter(k => !headers.includes(k))
              .filter((x, i, a) => a.indexOf(x) == i)
          ]),
          changes
        }))
        .then(({ changes, allHeaders }) => ({
          resultRows: R.chain(
            (row, i = R.indexOf(row, [...rows])) =>
              changes[i].map(change =>
                Object.entries(change).reduce(
                  (r, [k, v]) => [...r.slice(0, allHeaders.indexOf(k)), v, ...r.slice(allHeaders.indexOf(k) + 1)],
                  row.slice(0)
                )
              ),
            [...rows]
          ),
          allHeaders
        }))
        .then(({ resultRows, allHeaders, array }) => ({
          array: [allHeaders, ...resultRows],
          header,
          ...rest
        }));
      return result;
    };
    
    // Example Callbacks and their services
    const adapterPromise2 = async (name) => {
      const response = await fetch(`https://api.genderize.io?name=${name}`).then(res => res.json());
      return {
        changes: {
          gender: R.of(response.gender)
        }
      };
    };
    const servicePromise2 = input => mergeCallback(input, adapterPromise2);
    
    const adapterStatic1 = name => ({ changes: { NameLength: R.of(R.length(name)) } });
    const serviceStatic1 = input => mergeCallback(input, adapterStatic1);
    
    const seq = (configs) => (data) =>
      configs.reduce(
        (pr, [header, callback]) => pr.then(data => callback({...data, header})),
        Promise.resolve(data)
      )
    
    seq ([
      ['FirstName',  servicePromise2],
      ['FirstName', serviceStatic1]
    ]) (data)
    .then(console.log)
    <script src="//cdnjs.cloudflare.com/ajax/libs/ramda/0.26.1/ramda.js"></script>

    I still think there is something awkward about this, though. The header name you're looking for to me does not belong in that input data at all. You could make it another property of your mergeCallback function and update your wrappers to pass it from there, like

    const servicePromise2 = (input) => mergeCallback(input, 'FirstName', adapterPromise2);
    

    Even better to my mind, even though I understand it would add some work to your existing callback functions, would be to pass the whole row to the callback function, structured as an object with all the headers as properties. Ramda's zipObj could be used like this:

      const result = await Promise.all(rows.map(row => {
        return callback(zipObj(headers, row));
      }))
    

    to pass to each callback objects like this:

    {"#":"1", FirstName: "tim", LastName: "foo" /*, gender: 'male', ... */}
    

    You could change the signature of the callback to look like

    const adapterPromise2 = async ({FirstName: name}) => { ...use `name` ... }
    

    and leave the body intact, or simply change the variable name to FirstName to match the object.

    const adapterPromise2 = async ({FirstName}) => { ...use `FirstName`... }
    

    Either way, this would leave the generic code simpler, remove the header property that feels quite awkward in your current API without significantly changing your existing callbacks.