Search code examples
spring-bootrabbitmqlogstashlogbacklogstash-logback-encoder

Sending Logback logs to LogStash via RabbitMQ using JSON Encoder (Spring)


I am using Spring Boot (1.5.4). I wish to send (logback) logs from my services to Logstash via RabbitMQ in a JSON format rather than plain text. This will save me from having to set up a filter on the Logstash side so that formatting can be controlled on the application side (using a Logback Encoder).

I am aware of the Spring logback AMQP Appender for RabbitMQ org.springframework.amqp.rabbit.logback.AmqpAppender however this uses a Layout (plain text) rather than formatted JSON. I would like to use the LogStash Encoder net.logstash.logback.encoder.LogstashEncoder. I would like to use the Appender with the Encoder (I want it all :").


Solution

  • I first extended the AMQPAppender to add the Encoder like so:-

    package nz.govt.mpi.util;
    
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.logback.AmqpAppender;
    
    import ch.qos.logback.classic.spi.ILoggingEvent;
    import ch.qos.logback.core.encoder.Encoder;
    import lombok.Getter;
    import lombok.Setter;
    
    public class AmqpLogbackAppender extends AmqpAppender {
    
        @Getter
        @Setter
        private Encoder<ILoggingEvent> encoder;
    
        /**
         * We remove the default message layout and replace with the JSON {@link Encoder}
         */
        @Override
        public Message postProcessMessageBeforeSend(Message message, Event event) {
            return new Message(this.encoder.encode(event.getEvent()), message.getMessageProperties());
        }
    
        @Override
        public void start() {
            super.start();
            encoder.setContext(getContext());
    
            if (!encoder.isStarted()) {
                encoder.start();
            }
    
        }
    
        @Override
        public void stop() {
            super.stop();
            encoder.stop();
        }
    
    }
    

    And then I set up the logback-spring.xml configuration file like so:-

    <?xml version="1.0" encoding="UTF-8"?>
    <configuration>
        <springProperty scope="context" name="rabbitMQHost" source="logback.amqp.host" defaultValue="localhost"/>
        <springProperty scope="context" name="rabbitMQPort" source="logback.amqp.port" defaultValue="5672"/>
        <springProperty scope="context" name="rabbitMQUsername" source="spring.rabbitmq.username" />
        <springProperty scope="context" name="rabbitMQPassword" source="spring.rabbitmq.password" />
        <springProperty scope="context" name="rabbitMQExchangeName" source="logback.amqp.exchange.name" defaultValue="mpi.tradedev"/>
        <springProperty scope="context" name="rabbitMQRoutingKey" source="logback.amqp.routing.key" defaultValue="mpi.tradedev.logging"/>
        <springProperty scope="context" name="serviceName" source="spring.application.name" />
    
        <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
            <encoder>
                <pattern>%d{HH:mm:ss.SSS} [%thread, %X{X-B3-TraceId:-},%X{X-B3-SpanId:-}] %-5level %logger{36} - %msg%n</pattern>
            </encoder>
        </appender>
    
        <appender name="AMQP" class="nz.govt.mpi.util.AmqpLogbackAppender">
            <!-- layout is required but ignored as using the encoder for the AMQP message body -->
            <layout><pattern><![CDATA[ %level ]]></pattern></layout>
    
            <encoder class="net.logstash.logback.encoder.LogstashEncoder">
                <customFields>{"serviceName": "${serviceName}"}</customFields>
            </encoder>
    
        <!-- RabbitMQ connection -->
        <host>${rabbitMQHost}</host>
        <port>${rabbitMQPort}</port>
        <username>${rabbitMQUsername}</username>
        <password>${rabbitMQPassword}</password>
        <exchangeName>${rabbitMQExchangeName}</exchangeName>
        <routingKeyPattern>${rabbitMQRoutingKey}</routingKeyPattern>
    
        <declareExchange>true</declareExchange>
        <exchangeType>topic</exchangeType>
        <generateId>true</generateId>
        <charset>UTF-8</charset>
        <durable>true</durable>
        <deliveryMode>PERSISTENT</deliveryMode>
    
     </appender>
    
        <root level="info">
            <appender-ref ref="STDOUT" />
            <appender-ref ref="AMQP" />
        </root>
    </configuration>
    

    I lastly added the required properties to the application.properties file like so:-

    spring.application.name=my-app
    logback.amqp.host=localhost
    logback.amqp.port=5672
    logback.amqp.exchange.name=ex_logstash
    logback.amqp.routing.key=my-app.logging
    spring.rabbitmq.username=rquser
    spring.rabbitmq.password=rqpass
    

    I also had to set up the necessary user account in RabbitMQ. When the application runs it creates the topic (ex_logstash) but you must create a queue (qu_logstash) that is bound to that topic with the routing key match (my-app.*). You then create a logstash configuration to match the queue name.

    ex_logstash -> qu_logstash

    The logstash.json configuration file example:-

    input {
      rabbitmq {
        host => "localhost"
        queue => "qu_logstash"
        durable => true
        exchange => "ex_logstash"
        key => "my-app.*"
        threads => 10
        type => "topic"
        prefetch_count => 200
        port => 5672
        user => "rquser"
        password => "rqpass"
      }
    }
    

    On the application side you will need the required dependencies in your pom.xml. These are the ones I am using that cover the required classes YMMV:-

    <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>net.logstash.logback</groupId>
            <artifactId>logstash-logback-encoder</artifactId>
            <version>4.9</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-core</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>