can you help me with RabbitMQ input in logstash. My application sending versions of code to rabbitmq and then it go to store in elastic stack. For app in rabbitmq was created queue name: app_version_queue; type: classic; durable: true
Then logstash was configured with that config:
input {
rabbitmq {
id => "rabbitmyq_id"
# connect to rabbit
host => "localhost"
port => 5672
vhost => "/"
# INPUT - PRODUCERS
key => "app_version_queue"
# OUTPUT - CONSUMER
# queue for logstash
queue => "logstash"
auto_delete => false
# Exchange for logstash
exchange => logstash
exchange_type => direct
durable => "true"
# No ack will boost your perf
ack => false
}
}
output {
elasticsearch {
hosts => [ "elasticsearch:9200" ]
index => "app_version-%{+YYYY.MM.dd}"
}
}
It's worked, but now, in RabbitMQ console, i see in table Queued messages Ready: 914,444 Unacked: 0 Total: 914,444
And my disk space on rabbitmq cluster go to full in 3 days. After rebooting rabbitmq server, all space is free.
UPDATED: All reason, why i do that, i want to remove NIFI from that chain app=>rabbit=>nifi=>elastic I want to do: app=>rabbit=>logstash=>elastic
I try to stop NIFI sending, but messages not leaving.
It sounds like what's happened is you've created the infrastructure twice:
What you need is just three things:
What you have is all of this:
logs
(created manually) which your application publishes messages to.app_version_queue
(created manually) which nothing consumes from.logs
into app_version_queue
, which then sit there forever.logstash
(created by LogStash) which nothing publishes messages to.logstash
(created by LogStash) which LogStash consumes messages from.logstash
exchange to the logstash
queue which doesn't do anything, because no messages are published to that exchange.logs
exchange to the logstash
queue which is actually delivering the messages from your application.So, for each of the three things (the exchange, the queue, and the binding) you need to:
For instance, you could keep the names logs
and app_version_queue
, and create everything manually.
Then your LogStash application would look something like this:
input {
rabbitmq {
id => "rabbitmyq_id"
# connect to rabbit
host => "localhost"
port => 5672
vhost => "/"
# Consume from existing queue
queue => "app_version_queue"
# No ack will boost your perf
ack => false
}
}
On the other hand, you could create just the logs
exchange, and let LogStash create the queue and binding, like this:
input {
rabbitmq {
id => "rabbitmyq_id"
# connect to rabbit
host => "localhost"
port => 5672
vhost => "/"
# Create a new queue
queue => "logstash_processing_queue"
durable => "true"
# Take a copy of all messages with the "app_version_queue" routing key from the existing exchange
exchange => "logs"
key => "app_version_queue"
# No ack will boost your perf
ack => false
}
}
Or you could let LogStash create all of it, and make sure your application publishes to the right exchange:
input {
rabbitmq {
id => "rabbitmyq_id"
# connect to rabbit
host => "localhost"
port => 5672
vhost => "/"
# Create a new queue
queue => "logstash_processing_queue"
durable => "true"
# Create a new exchange; point your application to publish here!
exchange => "log_exchange"
exchange_type => "direct"
# Take a copy of all messages with the "app_version_queue" routing key from the new exchange
key => "app_version_queue"
# No ack will boost your perf
ack => false
}
}
I'd probably go with the middle option: the exchange is a part of the application's deployment requirements (it will produce errors if it can't publish there), but any number of queues might bind to it for different reasons (maybe none at all in a test environment, where you don't need ElasticSearch set up).