Ok here is an overview of what's going on:
M <-- Message with unique id of 1234
|
+-Start Queue
|
|
| <-- Exchange
/|\
/ | \
/ | \ <-- bind to multiple queues
Q1 Q2 Q3
\ | / <-- start of the problem is here
\ | /
\ | /
\|/
|
Q4 <-- Queues 1,2 and 3 must finish first before Queue 4 can start
|
C <-- Consumer
So I have an exchange that pushes to multiple queues, each queue has a task, once all tasks are completed, only then can Queue 4 start.
So message with unique id of 1234 gets sent to the exchange, the exchange routes it to all the task queues ( Q1, Q2, Q3, etc... ), when all the tasks for message id 1234 have completed, run Q4 for message id 1234.
How can I implement this?
Using Symfony2, RabbitMQBundle and RabbitMQ 3.x
Resources:
UPDATE #1
Ok I think this is what I'm looking for:
RPC with Parallel Processing, but how do I set the Correlation Id to be my unique id to group the messages and also identify what queue?
In the RPC tutorial at RabbitMQ's site, there is a way to pass around a 'Correlation id' that can identify your messages to users in the queue.
I'd recommend using some sort of id with your messages into the first 3 queues and then have another process to dequeue messages from the 3 into buckets of some sort. When those buckets receive what I'm assuming is the completion of there 3 tasks, send the final message off to the 4th queue for processing.
If you are sending more than 1 work item to each queue for one user, you might have to do a little preprocessing to find out how many items a particular user placed into the queue so the process dequeuing before 4 knows how many to expect before queuing up.
I do my rabbitmq in C#, so sorry my pseudo code isn't in php style
// Client
byte[] body = new byte[size];
body[0] = uniqueUserId;
body[1] = howManyWorkItems;
body[2] = command;
// Setup your body here
Queue(body)
// Server
// Process queue 1, 2, 3
Dequeue(message)
switch(message.body[2])
{
// process however you see fit
}
processedMessages[message.body[0]]++;
if(processedMessages[message.body[0]] == message.body[1])
{
// Send to queue 4
Queue(newMessage)
}
Response to Update #1
Instead of thinking of your client as a terminal, it might be useful to think of the client as a process on a server. So if you setup an RPC client on a server like this one, then all you need to do is have the server handle the generation of a unique id of a user and send the messages to the appropriate queues:
public function call($uniqueUserId, $workItem) {
$this->response = null;
$this->corr_id = uniqid();
$msg = new AMQPMessage(
serialize(array($uniqueUserId, $workItem)),
array('correlation_id' => $this->corr_id,
'reply_to' => $this->callback_queue)
);
$this->channel->basic_publish($msg, '', 'rpc_queue');
while(!$this->response) {
$this->channel->wait();
}
// We assume that in the response we will get our id back
return deserialize($this->response);
}
$rpc = new Rpc();
// Get unique user information and work items here
// Pass even more information in here, like what queue to use or you could even loop over this to send all the work items to the queues they need.
$response = rpc->call($uniqueUserId, $workItem);
$responseBuckets[array[0]]++;
// Just like above code that sees if a bucket is full or not