I am working on cache Server with Event-driven architecture, it going to work as following:
I would like Set
operations sent to all replicas(fanout exchange(?)) and Get
to a single arbitrarily one(default exchange(?)).
I have read Publish&Subscribe pattern and was able to make all servers response using fanout exchange
. I've read about RPC model and was able to make arbitrarily server response. But I am not able to unite this approaches into one architecture. Please help.
Qustions:
correlationId
. Should I reuse existing queues/exchanges or create new one?After going through your problem domain what I understand is - At run time multiple clients will be sending "set" and "get" messages to RabbitMQ and each "set" message is to processed by every Server Cache active at that moment. And "get" messages need to be processed by any one of the server cache and a response message need to be sent back to client that sent the "get" message.
Correct me if I am wrong.
And in such case it is fair to assume that there will be separate trigger points at client side for producing/publishing "get"/"set" messages. Therefore, logically "get" message producer and "set" message" publisher will be two separate programs/classes.
As such your selection of pub/sub and RPC model looks logical. The only thing you need to do is to unite the "set" and "get" message processing and Server Cache, which can quite easily be done using two separate channels (within the same connection) one for each set and get messages on the server cache. Refer my code attached below. I used the java code of the same samples (from rabbitmq site) that you have mentioned in you question. Some small modifications and it was quite straight forward. It would not be difficult to do the same in python too.
Now coming to you questions -
What is the best way to organize MQ to achieve this behavior?
Your selection of pub/sub and RPC models looks logical. The clients will publish "set" messages to an exchange (type Fanout, e.g. name "set_ex") and each Server Cache instance will be listening to their temporary queues (that last till the connection is live) that will be bound to exchange "set_ex". The clients will produce "get" messages to an exchange (type Direct, e.g. name "get_ex") and queue "get_q" will be bound to this exchange with its queuename. Every Server Cache will be listening on this "get_q". The Server Cache will send the result message to the temp queuename passed with the "get" message. Once the response message is received by client the connection is closed and the temp queue is removed. (note - in the sample code below I have bound the "get_q" in the default exchange as is the case with sample on rabbitmq site. But it is not difficult to bind "get_q" to a separate exchange (type Direct) for better manageability.)
Should I bind two queues into one exchange?
I don't think that would be a correct choice because for pub/sub scenario you will explicitly need a Fanout exchange and every message sent to a fanout exchange is copied to every queue bound to the exchange. And we don't want get messages to be pushed to all the Server Cache.
I would like to response from Server to client with correlationId. Should I reuse existing queues/exchanges or create new one?
All you need to do is send the response message from server to the tempQueueName that is passed along with the original "get" message, as is being used in the sample provided by rabbitmq.
The client code for publishing "set" messages.
public class Client {
private static final String EXCHANGE_NAME_SET = "set_ex";
public static void main(String[] args) throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.exchangeDeclare(EXCHANGE_NAME_SET, BuiltinExchangeType.FANOUT);
String message = getMessage(args);
channel.basicPublish(EXCHANGE_NAME_SET, "", null, message.getBytes("UTF-8"));
System.out.println("Sent '" + message + "'");
channel.close();
connection.close();
}
private static String getMessage(String[] strings) {
if (strings.length < 1)
return "info: Hello World!";
return joinStrings(strings, " ");
}
private static String joinStrings(String[] strings, String delimiter) {
int length = strings.length;
if (length == 0)
return "";
StringBuilder words = new StringBuilder(strings[0]);
for (int i = 1; i < length; i++) {
words.append(delimiter).append(strings[i]);
}
return words.toString();
}
}
The client code for producing "get" messages and receiving the response message back.
public class RPCClient {
private static final String EXCHANGE_NAME_GET = "get_ex";
private Connection connection;
private Channel channel;
private String requestQueueName = "get_q";
private String replyQueueName;
public RPCClient() throws IOException, TimeoutException {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
connection = factory.newConnection();
channel = connection.createChannel();
replyQueueName = channel.queueDeclare().getQueue();
}
public String call(String message) throws IOException, InterruptedException {
String corrId = UUID.randomUUID().toString();
AMQP.BasicProperties props = new AMQP.BasicProperties
.Builder()
.correlationId(corrId)
.replyTo(replyQueueName)
.build();
//channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8"));
final BlockingQueue<String> response = new ArrayBlockingQueue<String>(1);
channel.basicConsume(replyQueueName, true, new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
if (properties.getCorrelationId().equals(corrId)) {
response.offer(new String(body, "UTF-8"));
}
}
});
return response.take();
}
public void close() throws IOException {
connection.close();
}
public static void main(String[] args) throws IOException, TimeoutException {
RPCClient rpcClient = null;
String response = null;
try {
rpcClient = new RPCClient();
System.out.println(" sending get message");
response = rpcClient.call("30");
System.out.println(" Got '" + response + "'");
}
catch (IOException | TimeoutException | InterruptedException e) {
e.printStackTrace();
}
finally {
if (rpcClient!= null) {
try {
rpcClient.close();
}
catch (IOException _ignore) {}
}
}
}
}
The server code for subscribing to "set" messages and consuming "get" messages.
public class ServerCache1 {
private static final String EXCHANGE_NAME_SET = "set_ex";
private static final String EXCHANGE_NAME_GET = "get_ex";
private static final String RPC_GET_QUEUE_NAME = "get_q";
private static final String s = UUID.randomUUID().toString();
public static void main(String[] args) throws IOException, TimeoutException {
System.out.println("Server Id " + s);
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
// set server to receive and process set messages
Channel channelSet = connection.createChannel();
channelSet.exchangeDeclare(EXCHANGE_NAME_SET, BuiltinExchangeType.FANOUT);
String queueName = channelSet.queueDeclare().getQueue();
channelSet.queueBind(queueName, EXCHANGE_NAME_SET, "");
System.out.println("waiting for set message");
Consumer consumerSet = new DefaultConsumer(channelSet) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
throws IOException {
String message = new String(body, "UTF-8");
System.out.println("Received '" + message + "'");
}
};
channelSet.basicConsume(queueName, true, consumerSet);
// here onwards following code is to set up Get message processing at Server cache
Channel channelGet = connection.createChannel();
channelGet.queueDeclare(RPC_GET_QUEUE_NAME, false, false, false, null);
channelGet.basicQos(1);
System.out.println("waiting for get message");
Consumer consumerGet = new DefaultConsumer(channelGet) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties
.Builder()
.correlationId(properties.getCorrelationId())
.build();
System.out.println("received get message");
String response = "get response from server " + s;
channelGet.basicPublish( "", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
channelGet.basicAck(envelope.getDeliveryTag(), false);
// RabbitMq consumer worker thread notifies the RPC server owner thread
synchronized(this) {
this.notify();
}
}
};
channelGet.basicConsume(RPC_GET_QUEUE_NAME, false, consumerGet);
// Wait and be prepared to consume the message from RPC client.
while (true) {
synchronized(consumerGet) {
try {
consumerGet.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
}
Hope this helps.