Search code examples
amqpspring-amqp

Spring AMQP, CorrelationId and GZipPostProcessor: UnsupportedEncodingException


I have a Project with Spring AMQP (1.7.12.RELEASE). If I put a value for the correlationId field (etMessageProperties (). SetCorrelationId) and I use GZipPostProcessor, the following error always occurs:

"org.springframework.amqp.AmqpUnsupportedEncodingException: java.io.UnsupportedEncodingException: gzip"

To solve it, it seems that it works using the following code:

DefaultMessagePropertiesConverter messageConverter = new DefaultMessagePropertiesConverter();
    messageConverter.setCorrelationIdAsString(DefaultMessagePropertiesConverter.CorrelationIdPolicy.STRING);
template.setMessagePropertiesConverter(messageConverter);

but I do not know what implications it will have to use it in real with clients that do not use Spring AMQP (I establish this field if the message that has reached me has it). I enclose a complete example of code:

@Configuration
public class SimpleProducerGZIP 
{
    static final String queueName = "spring-boot";

    @Bean
    public CachingConnectionFactory connectionFactory() {
        com.rabbitmq.client.ConnectionFactory factory = new com.rabbitmq.client.ConnectionFactory();
        factory.setHost("localhost");
        factory.setAutomaticRecoveryEnabled(false);
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory(factory);
        return connectionFactory;
    }

    @Bean
    public AmqpAdmin amqpAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin ;
    }

    @Bean
    Queue queue() {
        Queue qr = new Queue(queueName, false);
        qr.setAdminsThatShouldDeclare(amqpAdmin());
        return qr;
    }

    @Bean 
    public RabbitTemplate rabbitTemplate() 
    {
        RabbitTemplate template = new RabbitTemplate(connectionFactory());
        template.setEncoding("gzip");
        template.setBeforePublishPostProcessors(new GZipPostProcessor());

         // TODO : 
        DefaultMessagePropertiesConverter messageConverter = new DefaultMessagePropertiesConverter();
        messageConverter.setCorrelationIdAsString(DefaultMessagePropertiesConverter.CorrelationIdPolicy.STRING);
        template.setMessagePropertiesConverter(messageConverter);

        return template;
    }

    public static void main(String[] args) 
    {
        @SuppressWarnings("resource")
        ApplicationContext context = new AnnotationConfigApplicationContext(SimpleProducerGZIP.class);
        RabbitTemplate _rabbitTemplate = context.getBean(RabbitTemplate.class);
        int contador = 0;
        try {
            while(true) 
            {
                contador = contador + 1;
                int _nContador = contador;
                System.out.println("\nInicio envio : " + _nContador);
                Object _o = new String(("New Message : " + contador));
                try
                {
                    _rabbitTemplate.convertAndSend(queueName, _o,
                            new MessagePostProcessor() {
                                @SuppressWarnings("deprecation")
                                @Override
                                public Message postProcessMessage(Message msg) throws AmqpException {
                                    if(_nContador%2 == 0) {
                                        System.out.println("\t--- msg.getMessageProperties().setCorrelationId ");
                                        msg.getMessageProperties().setCorrelationId("NewCorrelation".getBytes(StandardCharsets.UTF_8));
                                    }
                                    return msg;
                                }
                            }
                    );   
                    System.out.println("\tOK");
                }catch (Exception e) {
                    System.err.println("\t\tError en envio : " + contador + " - " + e.getMessage());
                }

                System.out.println("Fin envio : " + contador);
                Thread.sleep(500);
            }
        }catch (Exception e) {
            System.err.println("Exception : " + e.getMessage());
        }
    }
}

The question is, if I change the configuration of the rabbitTemplate so that the error does not happen, can it have implications for clients that use Spring AMQP or other alternatives?

--- EDIT (28/03/2019) This is the complete stack trace with the code:

org.springframework.amqp.AmqpUnsupportedEncodingException: java.io.UnsupportedEncodingException: gzip
    at org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter.fromMessageProperties(DefaultMessagePropertiesConverter.java:211)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doSend(RabbitTemplate.java:1531)
    at org.springframework.amqp.rabbit.core.RabbitTemplate$3.doInRabbit(RabbitTemplate.java:716)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.doExecute(RabbitTemplate.java:1455)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.execute(RabbitTemplate.java:1411)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.send(RabbitTemplate.java:712)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:813)
    at org.springframework.amqp.rabbit.core.RabbitTemplate.convertAndSend(RabbitTemplate.java:791)
    at es.jab.example.SimpleProducerGZIP.main(SimpleProducerGZIP.java:79)
Caused by: java.io.UnsupportedEncodingException: gzip
    at java.lang.StringCoding.decode(Unknown Source)
    at java.lang.String.<init>(Unknown Source)
    at java.lang.String.<init>(Unknown Source)
    at org.springframework.amqp.rabbit.support.DefaultMessagePropertiesConverter.fromMessageProperties(DefaultMessagePropertiesConverter.java:208)
    ... 8 more

Solution

  • I'd be interested to see the complete stack trace for more information about the problem.

    This code was part of a transition from a byte[] correlation Id to a String. This was needed to avoid a byte[]/String/byte[] conversion.

    When the policy is String, you should use the correlationIdString property instead of correlationId. Otherwise, the correlationId won't be mapped in outbound messages (we don't look at correlationId in that case). For inbound messages it controls which property is populated.

    In 2.0 and later, correlationId is now a String instead of a byte[] so this setting is no longer needed.

    EDIT

    Now I see the stack trace, this...

    template.setEncoding("gzip");
    

    ...is wrong.

    /**
     * The encoding to use when inter-converting between byte arrays and Strings in message properties.
     *
     * @param encoding the encoding to set
     */
    public void setEncoding(String encoding) {
        this.encoding = encoding;
    }
    

    There is no such Charset as gzip. This property has nothing to do with the message content, it is simply used when converting byte[] to/from String. It is UTF-8 by default.