Search code examples
rabbitmqlogstashlogstash-configuration

RABBITMQ send messages to LOGSTASH


can you help me with RabbitMQ input in logstash. My application sending versions of code to rabbitmq and then it go to store in elastic stack. For app in rabbitmq was created queue name: app_version_queue; type: classic; durable: true

Then logstash was configured with that config:

input {
  rabbitmq {
    id => "rabbitmyq_id"
    # connect to rabbit
    host => "localhost"
    port => 5672
    vhost => "/"
    # INPUT - PRODUCERS
    key => "app_version_queue"
    # OUTPUT - CONSUMER
    # queue for logstash
    queue => "logstash"
    auto_delete => false
    # Exchange for logstash
    exchange => logstash
    exchange_type => direct
    durable => "true"
    # No ack will boost your perf
    ack => false
  }
}

output {
  elasticsearch {
    hosts => [ "elasticsearch:9200" ]
    index => "app_version-%{+YYYY.MM.dd}"
  }
}

It's worked, but now, in RabbitMQ console, i see in table Queued messages Ready: 914,444 Unacked: 0 Total: 914,444

And my disk space on rabbitmq cluster go to full in 3 days. After rebooting rabbitmq server, all space is free.

UPDATED: All reason, why i do that, i want to remove NIFI from that chain app=>rabbit=>nifi=>elastic I want to do: app=>rabbit=>logstash=>elastic

  1. Queue: app_version My application send messages to nifi=>ELASTIC Queue1 - app_version_queue
  2. Queue: logstash, what i created with logstash Queue2 - logstash

I try to stop NIFI sending, but messages not leaving.


Solution

  • It sounds like what's happened is you've created the infrastructure twice:

    • Once manually in RabbitMQ
    • Once in the configuration options to LogStash

    What you need is just three things:

    • An exchange for the application to publish messages to.
    • A queue for LogStash to consume messages from.
    • A binding between that exchange and that queue; the queue will get a copy of every message published to the exchange with a matching routing key.

    What you have is all of this:

    • An exchange called logs (created manually) which your application publishes messages to.
    • A queue called app_version_queue (created manually) which nothing consumes from.
    • A binding (created manually) delivering copies of messages from logs into app_version_queue, which then sit there forever.
    • An exchange called logstash (created by LogStash) which nothing publishes messages to.
    • A queue called logstash (created by LogStash) which LogStash consumes messages from.
    • A binding (created by LogStash) from the logstash exchange to the logstash queue which doesn't do anything, because no messages are published to that exchange.
    • A binding (created manually) from the logs exchange to the logstash queue which is actually delivering the messages from your application.

    So, for each of the three things (the exchange, the queue, and the binding) you need to:

    • Decide a name
    • Decide if you're creating it, or letting LogStash create it
    • Configure everything to use the same name

    For instance, you could keep the names logs and app_version_queue, and create everything manually.

    Then your LogStash application would look something like this:

    input {
      rabbitmq {
        id => "rabbitmyq_id"
        # connect to rabbit
        host => "localhost"
        port => 5672
        vhost => "/"
        # Consume from existing queue
        queue => "app_version_queue"
        # No ack will boost your perf
        ack => false
      }
    }
    

    On the other hand, you could create just the logs exchange, and let LogStash create the queue and binding, like this:

    input {
      rabbitmq {
        id => "rabbitmyq_id"
        # connect to rabbit
        host => "localhost"
        port => 5672
        vhost => "/"
        # Create a new queue
        queue => "logstash_processing_queue"
        durable => "true"
        # Take a copy of all messages with the "app_version_queue" routing key from the existing exchange
        exchange => "logs"
        key => "app_version_queue"
        # No ack will boost your perf
        ack => false
      }
    }
    

    Or you could let LogStash create all of it, and make sure your application publishes to the right exchange:

    input {
      rabbitmq {
        id => "rabbitmyq_id"
        # connect to rabbit
        host => "localhost"
        port => 5672
        vhost => "/"
        # Create a new queue
        queue => "logstash_processing_queue"
        durable => "true"
        # Create a new exchange; point your application to publish here!
        exchange => "log_exchange"
        exchange_type => "direct"
        # Take a copy of all messages with the "app_version_queue" routing key from the new exchange
        key => "app_version_queue"
        # No ack will boost your perf
        ack => false
      }
    }
    

    I'd probably go with the middle option: the exchange is a part of the application's deployment requirements (it will produce errors if it can't publish there), but any number of queues might bind to it for different reasons (maybe none at all in a test environment, where you don't need ElasticSearch set up).