Search code examples
node.jsgoogle-cloud-platformpublish-subscribegoogle-cloud-pubsub

Google PubSub subscriber doesn't pull messages after first message in Node.Js


Summary: I have a Node.js application that publishes the messages to Google PubSub, and there is a subscriber that pulls the data from PubSub. While doing so, I noticed that subscriber is pulling notifications for the first time, but it does not pull any messages from subscriber after that.The application does not have a high volume of traffic: it is assumed that the publisher can send one message every one second.

Note: I have to restart PM2 server to get the second message.

I am not able to figure out why the subscriber is not able to pull after the first message. I would appreciate any assistance. 

Publisher code:

 const postNotificationsService = async (payload) => {
  try {

    const dataBuffer = Buffer.from(JSON.stringify(payload));
    console.log("Connected to Producer");
    console.log("Successfully posted to web notification Queue");
    let publishRes = await pubsub
      .topic(topicName)
      .publishMessage({ data: dataBuffer });

    console.log(publishRes)
    return { message: "Success" };
  } catch (e) {
    console.log(e)
    throw new Error(e.message);
  }
};

Subscriber Code:

const pullSubscriptionsMessage = () => {
  // const subscription = pubsub.subscription(subscriptionName);

  const subscriberOptions = {
    flowControl: {
      maxMessages: maxInProgress,
    },
  };

  // References an existing subscription.
  // Note that flow control settings are not persistent across subscribers.
  const subscription = pubsub.subscription(
    subscriptionName,
    subscriberOptions
  );

  console.log(
    `Subscriber to subscription ${subscription.name} is ready to receive messages at a controlled volume of ${maxInProgress} messages.`
  );

  // Create an event handler to handle messages
  let messageCount = 0;
  const messageHandler = async message => {
    console.log(`Received message ${message.id}:`);
    // console.log(`Data: ${message.data}`);
    // console.log(`tAttributes: ${message.attributes}`);
    messageCount += 1;
    let messagePayload = JSON.parse(message.data.toString())
    await mailerService.sendEmail(messagePayload);

    await message.ack();
  };

  

  // Listen for new messages/errors until timeout is hit
  subscription.on('message', messageHandler);
  setTimeout(() => {
    subscription.close();
    console.log(`${messageCount} message(s) received.`);
  }, timeout * 1000);

}

pullSubscriptionsMessage();

Output:

0|serverHTTP  | Connected to Producer
0|serverHTTP  | Successfully posted to web notification Queue
0|serverHTTP  | Received message 6339231272672371:
0|serverHTTP  | 6339231272672371
0|serverHTTP  | 1 message(s) received.
0|serverHTTP  | Connected to Producer
0|serverHTTP  | Successfully posted to web notification Queue
0|serverHTTP  | 6339223654064343


Solution

  • As specified, the subscriber is only running for timeout seconds before shutting down. If you want the subscriber to run indefinitely, you shouldn't have the setTimeout that closes the subscriber. You can see in the output that it shut down: 1 message(s) received.