Search code examples
spring-integrationspring-jdbcspring-integration-dsl

Spring Integration JdbcMessageStore casting error


I am trying to create service that will read some data from remote server and process them using Spring Integration.

I have class that extends ArrayList, because I need to keep pointer to other page, so I can read it in next remote call. I set up release strategy to collect all these pages, until there is no pointer for the next page. Here is definition of class:

public class CustomList extends ArrayList<DataInfo>
{

    private String nextCursor;

    // Methods omitted for readability
}

Everything worked fine, until I setup JdbcMessageStore in Aggregator, so I can keep messages in case of service shutdown. Problem on which I come across is that in my release strategy class I cast my list class to same class (because message group does not define type), this exceptions is raised:

java.lang.ClassCastException: com.example.CustomList cannot be cast to com.example.CustomList

This is my release strategy class:

@Component
public class CursorReleaseStrategy implements ReleaseStrategy
{
    @Override
    public boolean canRelease(MessageGroup group)
    {
        return group.getMessages().stream()
                .anyMatch(message -> ((CustomList) message.getPayload()).getNextCursor() == null);
    }
}

If I remove message store, everything works fine, but the problem is that I need message store.

I am using spring boot 2.1.6 and Spring Integration DSL for creating this flow. From what I read, this error happens because of different class loaders, but this I do from the same application. Is there anything more that I need to configure for this to work_


Solution

  • When application has been packaged in jar, there was such error. So to fix the problem I created two beans, depending on profile. For example:

    @Profile("!prod")
    @Bean
    public MessageGroupStore messageStore(DataSource dataSource)
    {
        JdbcMessageStore jdbcMessageStore = new JdbcMessageStore(dataSource);
        jdbcMessageStore.setDeserializer(inputStream -> {
            ConfigurableObjectInputStream objectInputStream = new ConfigurableObjectInputStream(inputStream, Thread.currentThread().getContextClassLoader());
    
            try {
                return (Message<?>) objectInputStream.readObject();
    
            } catch (ClassNotFoundException var4) {
                throw new NestedIOException("Failed to deserialize object type", var4);
            }
        });
        return jdbcMessageStore;
    }
    
    @Profile("prod")
    @Bean
    public MessageGroupStore prodMessageStore(DataSource dataSource)
    {
        return new JdbcMessageStore(dataSource);
    }