I have written a php worker to fetch data from RabbitMQ queue.The php worker is running successfully as a background job on some server which connects RabbitMQ server for consuming data using AMQP php extension.After there is no data enqued in the queue for 15 minutes, php scripts throws AMQPException. Exception is :
AMQPException Object (
[message:protected] => Library error: a socket error occurred
[string:Exception:private] =>
[code:protected] => 0
[file:protected] => /home/indiamart/public_html/dev-weberp-auto-dialer/merp/devworker.php
[line:protected] => 104
[trace:Exception:private] => Array
(
[0] => Array
(
[file] => /home/indiamart/public_html/dev-weberp-auto-dialer/merp/devworker.php
[line] => 104
[function] => consume
[class] => AMQPQueue
[type] => ->
[args] => Array
(
[0] => Closure Object
(
[parameter] => Array
(
[$message] => <required>
[$q] => <required>
)
)
)
)
)
[previous:Exception:private] =>
)
Below is my worker code:
<?php
$callback_func = function(AMQPEnvelope $message, AMQPQueue $q){
$data = json_decode($message->getBody(), true);
$getDeliveryTag = $message->getDeliveryTag();
$ack = $q->ack($getDeliveryTag);//used $getDeliveryTag
$to = isset($data["to"])?$data["to"]:"";
$subjectMail = isset($data["subject"])?$data["subject"]:"";
$mail_body_content = isset($data["body"])?"<pre>".$data["body"]."</pre>":"";
$mailfrom = isset($data["mailfrom"])?$data["mailfrom"]:"";
$cc = isset($data["cc"])?$data["cc"]:"";
$mailfromname = isset($data["mailfromname"])?$data["mailfromname"]:"";
$uniqueid = isset($data["unique_id"])?$data["unique_id"]:"";
if(!$ack){//if ack not recieved
$ack_msg = "Unique id: $uniqueid Subject : $subjectMail";
@mail("abc@example.com","History Queue Not acknowledged!",$ack_msg);
}
if($message->isRedelivery()){
$red_msg = "Unique id: $uniqueid Subject : $subjectMail";
@mail("abc@example.com","History Queue Redilivery!",$red_msg);
}
$m_headers_trail = "From:$mailfromname<$mailfrom> \n".
"Cc:$cc\n".
"MIME-Version: 1.0 \n".
"Content-type: text/html; charset=UTF-8";
$flag = @mail($to,$subjectMail,$mail_body_content,$m_headers_trail);
$flag_message = $flag ? "success unique id: $uniqueid Subject :$subjectMail" : "mail failed";
@mail("abc@example.com","History Mailer Result",$flag_message);
};
$host = "127.0.0.1";
$vhost = "/";
$port = 5672;
$login = "admin";
$password = "admin";
CHANNEL :
try{
@mail("abc@example.com","History Queue worker Start!","Worker
started!");
$cnn = new AMQPConnection(array("host" => $host,"vhost" => $vhost,"port"
=> $port,"login" => $login,"password" => $password));
$cnn->connect();
if(!$cnn){
@mail("abc@example.com","History Queue Connection
Error!","Connection not established!");
}
$ch = new AMQPChannel($cnn);
$queue = new AMQPQueue($ch);
$queue->setName('STS_UPDATE_MAIL');
$queue->setFlags(AMQP_NOPARAM);
$queue->consume($callback_func);
$ch->close();
$cnn->close();
}catch(Exception $e){
if ($ch->isConnected()) {
$ch->close();
@mail("abc@example.com","History Queue Closing
Connection!","Closed!");
}
if ($cnn->isConnected()) {
$cnn->close();
@mail("abc@example.com","History Queue Closing
Connection!","Closed!");
}
goto CHANNEL;
}
?>
It is a library issue. I than shifted the worker in golang