Can someone explain to me how I can trigger Celery tasks through Logstash? Is it possible?
If I try to do that in PHP through the 'php-amqplib' library it works fine: (without using Logstash)
$connection = new AMQPStreamConnection(
$channel = $connection->channel();
$taskId = rand(1000, 10000);
$props = array(
'content_type' => 'application/json',
'content_encoding' => 'utf-8',
$body = array(
'task' => 'process_next_task',
'lang' => 'py',
'args' => array('ktest' => 'vtest'),
'kwargs' => array('ktest' => 'vtest'),
'origin' => '@'.'mytest',
'id' => $taskId,
$msg = new AMQPMessage(json_encode($body), $props);
$channel->basic_publish($msg, 'celery', 'celery');
According to the Celery docs:
I'm trying to send the request in the json format, this is my Logstash filter:
remove_field => ['headers', '@timestamp', '@version', 'host', 'type']
code => "
:content_type => 'application/json',
:content_encoding => 'utf-8'
And Celery answer is:
[2017-05-05 14:35:09,090: WARNING/MainProcess] Received and deleted unknown message. Wrong destination?!
{content_type:None content_encoding:None delivery_info:{'exchange': 'celery', 'routing_key': 'celery', 'redelivered': False, 'consumer_tag': 'None4', 'delivery_tag': 66} headers={}}
Basically, Celery is not able to decode my message format or better... I'm not able to set the request in the JSON format :)
It's driving me crazy, thank you in advance for any clues :)
Forgot it, this is my output plugin in Logstash
key => "celery"
exchange => "celery"
exchange_type => "direct"
user => "${RABBITMQ_USER}"
password => "${RABBITMQ_PASSWORD}"
host => "${RABBITMQ_HOST}"
port => "${RABBITMQ_PORT}"
durable => true
persistent => true
codec => json
From the information provided in this question, you can't.
When you're playing with the event in the ruby filter, you're actually playing with what will be put in the body of the message, while you'd like to set the rabbitmq headers and properties of your message.
Till that functionality has been tackled, I do not think you'll be able to achieve it unless of course you implement it yourself. After all, the plugin is available on github.