Apollo GraphQL: MQTT Subscribe to a Broker to just provide the published data


I have a sensor node which publishes information on a specific MQTT Topic (sent to a Mosquitto broker). The data sent is a pure string.


currently I am using apollo-server-express to build a GraphQL Server. I wish to use `graphql-mqtt-subscriptions to:

  • Subscribe to the MQTT Broker
  • Read the information on a specific topic and just return it to the graphiql UI


"dependencies": {
    "apollo-server-express": "^2.8.1",
    "express": "^4.17.1",
    "graphql": "^14.4.2",
    "graphql-mqtt-subscriptions": "^1.1.0",
    "graphql-subscriptions": "^1.1.0",
    "graphql-tools": "^4.0.5",
    "mqtt": "^3.0.0",
    "subscriptions-transport-ws": "^0.9.16"

Code Snippets

the entrypoint server.js code:

import express from 'express';
import {ApolloServer } from 'apollo-server-express';
import { typeDefs } from './graphql/schema';
import { resolvers } from './graphql/resolvers';
import { createServer } from 'http';

const server = new ApolloServer({ typeDefs, resolvers});

const app = express();

server.applyMiddleware({ app });

const httpServer = createServer(app);


httpServer.listen({port: 4000}, () => {
    console.log(`🚀 Server ready at http://localhost:4000/${server.graphqlPath}`)
    console.log(`🚀 Subscriptions ready at ws://localhost:4000/${server.subscriptionsPath}`)

the typeDefs Schema for GraphQL is the following:

type Result {
        data: String

type Subscription {
        siteAdded(topic: String): Result

schema {
  query: Query
  mutation: Mutation
  subscription: Subscription

where siteAdded(topic: String) will take the topic the MQTT needes to subscribe to. Example:

subscription {
   siteAdded(topic: "test/1/env") {

The resolvers.js looks like the following (as mentioned in may documentations):

import { MQTTPubSub } from 'graphql-mqtt-subscriptions';
import { connect } from 'mqtt';

const client = connect('mqtt://', {
    reconnectPeriod: 1000,

const pubsub = new MQTTPubSub({

export const resolvers: {
Subscription: {
        siteAdded: {
            subscribe: (_, args) => {
                console.log(args.topic); // to check if this gets called or not.



the console.log on the args.topic gets called but after that the following error in graphiql:

  "error": {
    "message": "Subscription field must return Async Iterable. Received: undefined"

If I perform a return pubsub.asyncIterator():

It provides the timely data from the Broker but the output is null:

  "data": {
    "siteAdded": null

I have added the Websockets middleware in the server.js mentioned above according to the Apollo Docs

Where Am I going wrong here and how to just add the data coming from the subscribed topic to graphiql?


  • Summary



    • The wildcards like + and # in the graphql-mqtt-subscriptions v1.1.0 on the NPM Registry are not available. However, the repository has the implementations already. The owner of the repository needs to update the registry. See Open Issue for graphql-mqtt-subscriptions

    • I am currently using a complete topic for the MQTT Subscription in order to get the data from the sensor e.g. test/1/env as opposed to test/+/env

    Development Update

    • Previously I was sending data from the sensor in a raw string format (plain-text), hence I updated the firmware to send data in JSON String as follows:

        {"data": "temp=23,humid=56 1500394302"}


    1. As mentioned in the comments by @Dom and @DanielRearden I forget initially to add return if I used the curly brackets {}. e.g.:

          Subscription: {
          siteAdded: {
              subscribe: (_, args) => {
                  console.log(args.topic); // to check if this gets called or not.
                  return pubsub.asyncIterator([args.topic]);

      or I just removed the brackets and return by writing the resolver as follows:

          Subscription: {
             siteAdded: {
                 subscribe: (_, args) => pubsub.asyncIterator([args.topic]),

      This was still returning me null as mentioned in the query.

    2. I was able to obtain the data from the subscription by following the Payload Transformation documentation for Apollo where within my resolvers I did the following:

         Subscription: {
          siteAdded: {
              resolve: (payload) => {
                  return {
              subscribe: (_, args) => pubsub.asyncIterator([args.topic]),

      The payload had to be resolved accordingly for the Schema.


    Now a subscription as follows work like a charm:

        subscription {
            siteAdded(topic: "test/1/env") {

    provides the following result:

      "data": {
        "siteAdded": {
          "data": "temp=27.13,humid=43.33 1565345004"