It is possible to modify the request payload of a HTTP-based context (e.g. this method), but is there a way to modify the payload of an RPC-based context?
In my case, my NestJS setup is a microservice listening to a specific Kafka topic (Customer
). The messages are encoded using Avro, so I'm trying to create an interceptor to globally decode every message from Kafka.
// inside intercept function
const rpcContext = context.switchToRpc().getContext();
const kafkaMessageBuffer: Buffer = context.switchToRpc().getData();
const decodedMessage = await this.schemaRegistry.decode(kafkaMessageBuffer);
// My attempt to replace the value (which is a buffer) with the decoded data
rpcContext.args[0]['value'] = decodedMessage;
This doesn't work because args
is a protected property so I couldn't modify it.
What are the other alternative ways I could globally decode all Kafka messages?
Other alternatives that I could think of:
@Payload(AvroTransformPipe) message: CustomerMessage
I ended up using a Global Pipe.
transform(value: any, metadata: ArgumentMetadata): any {
if (metadata.type !== 'body') {
return value;
}
return value instanceof Buffer ? this.schemaRegistry.decode(value) : null;
}
This solution is not perfect because it will apply to every decorator (hence the metadata.type
check), and I'm not sure how to target it to a specific decorator (in my case I only want to target @Payload()
). It could also be problematic if the application is a Hybrid application with HTTP endpoints.