Search code examples
node.jsnestjs

NestJs - Worker to consume GCP Pubsub


I need to develop an worker to consume GCP Pub/Sub messages.

Here is the code that I'm using:

main.ts

import { INestApplication, INestApplicationContext } from '@nestjs/common';
import { CancellationTokenService } from '@/services/cancellationToken';
import { NestFactory } from '@nestjs/core';
import { AppModule } from '@/app.module';
import { PubSubService } from '@/services/pubSub';
async function main() {
  const app = await NestFactory.createApplicationContext(AppModule);
  const cancellationToken = app.get(CancellationTokenService);
  configureApplicationStop(cancellationToken);
  await run(app, cancellationToken);
  while (!cancellationToken.isCancelled) {
    console.log('Microservice is running...');
    await new Promise((resolve) => setTimeout(resolve, 1000));
  }
  await app.close();
}



async function run(
  app: INestApplicationContext,
  cancellationToken: CancellationTokenService,
) {
  try {
    const pubSub = app.get(PubSubService);
    await pubSub.consumeMessages();
  } catch (error) {
    console.error(error);
    cancellationToken.cancel();
  }
}

function configureApplicationStop(cancellationToken: CancellationTokenService) {
  process.on('SIGINT', () => {
    cancellationToken.cancel();
    console.log('Stopping application...');
  });
}
main();

app.module.ts

import { Module } from '@nestjs/common';
import { PubSubService } from '@/services/pubSub';

import { CancellationTokenService } from '@/services/cancellationToken';
@Module({
  imports: [],
  providers: [CancellationTokenService, PubSubService],
})
export class AppModule {}

pubsub.service.ts

import { Injectable } from '@nestjs/common';
import { PubSub, Subscription } from '@google-cloud/pubsub';
import { CancellationTokenService } from '@/services/cancellationToken';
@Injectable()
export class PubSubService {
  pubsub: PubSub;
  subscription: Subscription;
  constructor(private readonly cancellationToken: CancellationTokenService) {
    this.pubsub = new PubSub();
    this.initialize();
    console.log('PubsubService');
  }
  async initialize(): Promise<void> {
    this.subscription = this.initializeSubscription();
  }
  initializeSubscription(): Subscription {
    return this.pubsub.subscription('my-subsription');
  }
  async consumeMessages(): Promise<void> {
    if (this.cancellationToken.isCancelled) {
      console.log('cancellationToken.isCancelled');
      this.subscription.close();
    }
    console.log('cancellationToken.isCancelled');
    if (!this.subscription.isOpen) this.subscription.open();
    this.subscription.on('message', (message) => {
      if (this.cancellationToken.isCancelled)
        console.log('cancellationToken.isCancelled consuming messages');
      console.log(`Received message ${message.id}:`);
      console.log(`\tData: ${message.data}`);
      console.log(`\tAttributes: ${message.attributes}`);
      message.nack();
      //message.ack();
    });
  }
}

cancellationToken.service.ts

import { Injectable, Scope } from '@nestjs/common';
@Injectable({ scope: Scope.DEFAULT })
export class CancellationTokenService {
  private _isCancelled: boolean;
  constructor() {
    this._isCancelled = false;
  }
  get isCancelled(): boolean {
    return this._isCancelled;
  }
  cancel(): void {
    this._isCancelled = true;
  }
  reset(): void {
    this._isCancelled = false;
  }
}

I need to stop consuming messages when SIGTERM is triggered.

I don't know why it isn't working, because apparenlty the application is logging Stopping application...

I'm currently new with nest js so I'm kind of lost if this is the right way to do it.


Solution

  • Maybe this is not the best aproach, but for now this solve the issue.

    I've created an instance of cancellationTokenService and custom provider in my app.module.ts

    import { CancellationTokenService } from '@/services/cancellationToken';
    const cancellationTokenService = new CancellationTokenService();
    //...
    const cancellationTokenProvider: Provider = {
      provide: CancellationTokenService,
      useFactory: () => cancellationTokenService,
    };
    @Module({
      imports: [],
      providers: [cancellationTokenProvider, PubSubService],
    })