Search code examples
node.jsgraphqlmqttapollo-servergraphql-subscriptions

Apollo GraphQL: MQTT Subscribe to a Broker to just provide the published data


Scenario

I have a sensor node which publishes information on a specific MQTT Topic (sent to a Mosquitto broker). The data sent is a pure string.

Backend

currently I am using apollo-server-express to build a GraphQL Server. I wish to use `graphql-mqtt-subscriptions to:

  • Subscribe to the MQTT Broker
  • Read the information on a specific topic and just return it to the graphiql UI

dependencies

"dependencies": {
    "apollo-server-express": "^2.8.1",
    "express": "^4.17.1",
    "graphql": "^14.4.2",
    "graphql-mqtt-subscriptions": "^1.1.0",
    "graphql-subscriptions": "^1.1.0",
    "graphql-tools": "^4.0.5",
    "mqtt": "^3.0.0",
    "subscriptions-transport-ws": "^0.9.16"
  },

Code Snippets

the entrypoint server.js code:

import express from 'express';
import {ApolloServer } from 'apollo-server-express';
import { typeDefs } from './graphql/schema';
import { resolvers } from './graphql/resolvers';
import { createServer } from 'http';


const server = new ApolloServer({ typeDefs, resolvers});

const app = express();

server.applyMiddleware({ app });

const httpServer = createServer(app);

server.installSubscriptionHandlers(httpServer);

httpServer.listen({port: 4000}, () => {
    console.log(`🚀 Server ready at http://localhost:4000/${server.graphqlPath}`)
    console.log(`🚀 Subscriptions ready at ws://localhost:4000/${server.subscriptionsPath}`)
});

the typeDefs Schema for GraphQL is the following:


type Result {
        data: String
}

type Subscription {
        siteAdded(topic: String): Result
    }

schema {
  query: Query
  mutation: Mutation
  subscription: Subscription
}

where siteAdded(topic: String) will take the topic the MQTT needes to subscribe to. Example:

subscription {
   siteAdded(topic: "test/1/env") {
       data
   }

The resolvers.js looks like the following (as mentioned in may documentations):

import { MQTTPubSub } from 'graphql-mqtt-subscriptions';
import { connect } from 'mqtt';

const client = connect('mqtt://my.mqtt.broker.ip.address', {
    reconnectPeriod: 1000,
});

const pubsub = new MQTTPubSub({
    client
});

export const resolvers: {
Subscription: {
        siteAdded: {
            subscribe: (_, args) => {
                console.log(args.topic); // to check if this gets called or not.
                pubsub.asyncIterator([args.topic]);

            }
        }
    }
};

Inference

the console.log on the args.topic gets called but after that the following error in graphiql:

{
  "error": {
    "message": "Subscription field must return Async Iterable. Received: undefined"
  }
}

If I perform a return pubsub.asyncIterator():

It provides the timely data from the Broker but the output is null:

{
  "data": {
    "siteAdded": null
  }
}

I have added the Websockets middleware in the server.js mentioned above according to the Apollo Docs

Where Am I going wrong here and how to just add the data coming from the subscribed topic to graphiql?


Solution

  • Summary

    Update

    Caveats

    • The wildcards like + and # in the graphql-mqtt-subscriptions v1.1.0 on the NPM Registry are not available. However, the repository has the implementations already. The owner of the repository needs to update the registry. See Open Issue for graphql-mqtt-subscriptions

    • I am currently using a complete topic for the MQTT Subscription in order to get the data from the sensor e.g. test/1/env as opposed to test/+/env

    Development Update

    • Previously I was sending data from the sensor in a raw string format (plain-text), hence I updated the firmware to send data in JSON String as follows:

        {"data": "temp=23,humid=56 1500394302"}
      

    Solution

    1. As mentioned in the comments by @Dom and @DanielRearden I forget initially to add return if I used the curly brackets {}. e.g.:

          Subscription: {
          siteAdded: {
              subscribe: (_, args) => {
                  console.log(args.topic); // to check if this gets called or not.
                  return pubsub.asyncIterator([args.topic]);
      
              }
          }
      }
      

      or I just removed the brackets and return by writing the resolver as follows:

          Subscription: {
             siteAdded: {
                 subscribe: (_, args) => pubsub.asyncIterator([args.topic]),
             }
          }
      

      This was still returning me null as mentioned in the query.

    2. I was able to obtain the data from the subscription by following the Payload Transformation documentation for Apollo where within my resolvers I did the following:

         Subscription: {
          siteAdded: {
              resolve: (payload) => {
                  return {
                      data: payload.data,
                  };
              },
              subscribe: (_, args) => pubsub.asyncIterator([args.topic]),
          }
      }
      

      The payload had to be resolved accordingly for the Schema.

    Result

    Now a subscription as follows work like a charm:

        subscription {
            siteAdded(topic: "test/1/env") {
                data
            }
        }
    

    provides the following result:

    {
      "data": {
        "siteAdded": {
          "data": "temp=27.13,humid=43.33 1565345004"
        }
      }
    }