I'm replicating EasyNetQ functionality in NodeJS (so that a Node app can communicate with over Rabbit with an EasyNetQ enabled .NET app). I've replicated EasyNetQ's Publish/Subscribe and EasyNetQ's Send/Receive, but i'm having some difficulty with EasyNetQ's Request/Response.
Here is my current Node code:
var rqrxID = uuid.v4(); //a GUID
var responseQueue = 'easynetq.response.' + rqrxID;
Q(Play.AMQ.ConfirmChannel.assertQueue(responseQueue, { durable: false, exclusive: true, autoDelete: true }))
.then((okQueueReply) =>
Play.AMQ.ConfirmChannel.consume(responseQueue, (msg) => {
//do something here...
Play.AMQ.ConfirmChannel.ack(msg);
})
)
.then((okSubscribeReply) => {
Q(Play.AMQ.ConfirmChannel.assertExchange('easy_net_q_rpc', 'direct', { durable: true, autoDelete: false }))
.then((okExchangeReply) =>
Play.AMQ.ConfirmChannel.publish(
global.AppConfig.amq.rpc.exchange,
dto.AsyncProcessorCommand.Type,
Play.ToBuffer(command),
{ type: command.GetType() },
(err, ok): void => {
if (err !== null) {
console.warn('Message nacked!');
responseDeferred.reject(err);
}
}
)
)
})
.catch((failReason) => {
console.error(util.format('Error creating response queue: %s', failReason));
return null;
});
Note that the publish works and is received by the .NET code. That code then sends a response and the issue is that the response isn't received. Here's the .NET code:
Bus.Respond<AsyncProcessorCommand, AsyncProcessorCommandResponse>(
request =>
{
Console.WriteLine("Got request: '{0}'", request);
return new AsyncProcessorCommandResponse()
{
ID = Guid.NewGuid(),
ResponseType = "ENQResp"
};
});
I'm sure I'm missing something, but not sure what. Who can help?
UPDATE I have solved at least part of this. Taking the value of responseQueue and setting that into the options for publish as "replyTo" hooks the response up - nice. Now I just have to figure out how to either not create a new queue each time OR, make the response queue go away...
UPDATE FINAL So, using the channel setup I had and saving the cinsumerTag (actually, specifying it) allowed me to cancel the consumer and the queue auto-deleted.
Taking my comments from above to answer this.
There are two pieces to this. First, from the code above, create your response queue so that it auto-deletes (when the consumer count drops to 0):
channel.assertQueue(responseQueue, { durable: false, exclusive: true, autoDelete: true }))
Then create/publish to the queue the "server" is listening on - making sure to set "replyTo" for the response queue you just created (the type piece is another bit of ENQ-needed code):
{ type: command.GetType(), replyTo: responseQueue }
So an entire (currently messy as it's "play" code) method for executing this pattern looks like:
private static Request(command: dto.AsyncProcessorCommand): Q.Promise<dto.interfaces.IAsyncProcessorCommandResponse> {
var responseDeferred = Q.defer<dto.interfaces.IAsyncProcessorCommandResponse>();
var consumerTag = uuid.v4();
var rqrxID = uuid.v4();
var responseQueue = 'easynetq.response.' + rqrxID;
var handleResponse = (msg: any): void => {
var respType = null;
switch(command.Action) {
default:
respType = 'testResp';
}
//just sending *something* back, should come from 'msg'
responseDeferred.resolve(new dto.AsyncProcessorCommandResponse(respType, { xxx: 'yyy', abc: '123' }));
}
Q(Play.AMQ.ConfirmChannel.assertQueue(responseQueue, { durable: false, exclusive: true, autoDelete: true }))
.then((okQueueReply) =>
Play.AMQ.ConfirmChannel.consume(responseQueue, (msg) => {
handleResponse(msg);
Play.AMQ.ConfirmChannel.ack(msg);
Play.AMQ.ConfirmChannel.cancel(consumerTag);
},
{ consumerTag: consumerTag })
)
.then((okSubscribeReply) => {
Q(Play.AMQ.ConfirmChannel.assertExchange('easy_net_q_rpc', 'direct', { durable: true, autoDelete: false }))
.then((okExchangeReply) =>
Play.AMQ.ConfirmChannel.publish(
'easy_net_q_rpc',
dto.AsyncProcessorCommand.Type,
Play.ToBuffer(command),
{ type: command.GetType(), replyTo: responseQueue },
(err, ok): void => {
if (err !== null) {
console.warn('Message nacked!');
responseDeferred.reject(err);
}
}
)
)
})
.catch((failReason) => {
console.error(util.format('Error creating response queue: %s', failReason));
return null;
});
return responseDeferred.promise
}