Search code examples
springdesign-patternsrabbitmqspring-amqpspring-rabbit

How to do code refactoring in rabbitmq consumer?


I implemented the necessary functionality on consumer and it works fine.

@Service
@RequiredArgsConstructor
public class RabbitMQConsumer {
    private static final Logger logger = LoggerFactory.getLogger(RabbitMQConsumer.class);
    private final EventHandlerStrategy handlerStrategy;

    @RabbitListener(queues = "#{vehicleQueue.name}")
    public void consumePayload(@Payload String encodedMessage,
                               @Header(AmqpHeaders.RECEIVED_ROUTING_KEY) String receivedRoutingKey){
        try {
            String payload = new String(Base64.getDecoder().decode(encodedMessage)).trim(); // **1. Decoding string message**
            JsonObject jsonMessage = JsonParser.parseString(payload).getAsJsonObject();     // **2. Parsing JSON string to json object**

            String prefixRoutingKey = receivedRoutingKey.split("\\.")[0];

            VehicleType vehicleType = VehicleProfile                                        // **3. Getting vehicle type from inbound service**
                    .getTopicConverter(prefixRoutingKey)
                    .map(converter -> converter.convert(prefixRoutingKey))
                    .orElseThrow(() -> new MessageException(String.format("Not found vehicle type '%s'", prefixRoutingKey)));

            jsonMessage.addProperty("vehicleType", vehicleType.name());
            int type = jsonMessage.get("fuelTypeCode").getAsInt();

            handlerStrategy.getEventStrategy(type).ifPresentOrElse(iEventHandler -> iEventHandler.consumeEvent(jsonMessage),  // **4. send event** 
                    () -> logger.error("There are no event handler found '{}'", type));

        } catch (IllegalArgumentException e){    // **5. Error handing**                                           
            logger.debug("[{}] Failed to decode base64 string", encodedMessage);
        } catch (JsonSyntaxException e){
            logger.error("an error occurred while parsing event", e);
        } catch (MessageException e) {
            logger.error("Event exception: ", e);
        }
    }
}

I'm not thrilled with how the implementation looks. Too many different responsibilities in one place. Its looks messy and it will be hard to tests all of this.

Please see the code comment lines:

  1. Decoding string message
  2. Parsing JSON string to json object
  3. Getting vehicle type from inbound service
  4. Send event
  5. Error handing

My question is how to correctly split all of this ? Is there any patterns, practices , books or article to avoid that mess in the future ?

Is there a good idea to move decoding and parsing into Spring AMQP Message Converters or interseptors and get in listener ready JsonObject ? How can i implement this and how to handle error ?

    @RabbitListener(queues = "#{vehicleQueue.name}")
    public void consumePayload(@Payload JsonObject encodedMessage)

Solution

  • Yes, you can encapsulate that logic (up to jsonMessage.addProperty("vehicleType", vehicleType.name());) into a custom MessageConverter. The @RabbitListener has specific attribute on the matter:

    /**
     * Override the container factory's message converter used for this listener.
     * @return the message converter bean name. If a SpEL expression is provided
     * ({@code #{...}}), the expression can either evaluate to a converter instance
     * or a bean name.
     * @since 2.3
     */
    String messageConverter() default "";
    

    The error handling also can be done separately. See respective attribute on the @RabbitListener:

    /**
     * Set an
     * {@link org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler} to
     * invoke if the listener method throws an exception. A simple String representing the
     * bean name. If a Spel expression (#{...}) is provided, the expression must
     * evaluate to a bean name or a
     * {@link org.springframework.amqp.rabbit.listener.api.RabbitListenerErrorHandler}
     * instance.
     * @return the error handler.
     * @since 2.0
     */
    String errorHandler() default "";
    

    In the end your code would indeed look just like that:

    @RabbitListener(queues = "#{vehicleQueue.name}", 
                    messageConverter = "myJsonObjectConverter", 
                    errorHandler = "myJsonErrorHandler")
    public void consumePayload(JsonObject encodedMessage) {
       int type = encodedMessage.get("fuelTypeCode").getAsInt();
    
       handlerStrategy.getEventStrategy(type).ifPresentOrElse(iEventHandler -> iEventHandler.consumeEvent(encodedMessage),  //
                    () -> logger.error("There are no event handler found '{}'", type));
    }
    

    More info in docs: https://docs.spring.io/spring-amqp/reference/amqp/receiving-messages/async-annotation-driven.html