Search code examples
typescriptapache-kafkanestjs

Modifying RPC context data from an interceptor?


It is possible to modify the request payload of a HTTP-based context (e.g. this method), but is there a way to modify the payload of an RPC-based context?

In my case, my NestJS setup is a microservice listening to a specific Kafka topic (Customer). The messages are encoded using Avro, so I'm trying to create an interceptor to globally decode every message from Kafka.

// inside intercept function
const rpcContext = context.switchToRpc().getContext();
const kafkaMessageBuffer: Buffer = context.switchToRpc().getData();
const decodedMessage = await this.schemaRegistry.decode(kafkaMessageBuffer);

// My attempt to replace the value (which is a buffer) with the decoded data
rpcContext.args[0]['value'] = decodedMessage;

This doesn't work because args is a protected property so I couldn't modify it.

What are the other alternative ways I could globally decode all Kafka messages?

Other alternatives that I could think of:

  • Create a custom pipe to transform the data in the controller:
    • @Payload(AvroTransformPipe) message: CustomerMessage
  • Manually decode the message in the controller before passing the decoded message to services

Solution

  • I ended up using a Global Pipe.

    transform(value: any, metadata: ArgumentMetadata): any {
      if (metadata.type !== 'body') {
        return value;
      }
    
      return value instanceof Buffer ? this.schemaRegistry.decode(value) : null;
    }
    

    This solution is not perfect because it will apply to every decorator (hence the metadata.type check), and I'm not sure how to target it to a specific decorator (in my case I only want to target @Payload()). It could also be problematic if the application is a Hybrid application with HTTP endpoints.