Search code examples
springspring-integrationspring-dslspring-cloud-streamspring-cloud-dataflow

ConcurrentModificationException while using Spring Integration DSL Mail


I am trying to use Spring Integration Java DSL for recieving email (gmail). I am also using Redis. When I recieve mail I get ConcurrentModificationException . While searching I also found that people are getting the same error in Kafka as well. Below is my code and exception.

Thanks for help is advance.

    Properties javaMailProperties = new Properties();
    javaMailProperties.setProperty("mail.imap.socketFactory.class", "javax.net.ssl.SSLSocketFactory");
    javaMailProperties.setProperty("mail.imap.socketFactory.fallback", "false");
    javaMailProperties.setProperty("mail.store.protocol", "imaps");
    javaMailProperties.setProperty("mail.debug", "false");
    String imapUrl = "imaps://email%40gmail.com:password@imap.gmail.com:993/INBOX";



    IntegrationFlowBuilder flowBuilder = IntegrationFlows.from(Mail.imapInboundAdapter(imapUrl).javaMailProperties(javaMailProperties).shouldMarkMessagesAsRead(true)
            , new Consumer<SourcePollingChannelAdapterSpec>() {


        @Override
        public void accept(SourcePollingChannelAdapterSpec sourcePollingChannelAdapterSpec) {
            sourcePollingChannelAdapterSpec
                    .poller(defaultPoller);
        }
    });
    return flowBuilder.channel(source.output()).get();

Exception :

2016-04-16 09:19:51.449  INFO 11640 --- [ask-scheduler-7] o.s.integration.mail.ImapMailReceiver    : attempting to receive mail from folder [INBOX]
2016-04-16 09:21:31.463 ERROR 11640 --- [ask-scheduler-7] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [org.springframework.cloud.stream.binder.redis.RedisMessageChannelBinder$SendingHandler@2f104438]; nested exception is com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (sun.security.ssl.SSLSocketImpl)
c (sun.security.ssl.AppInputStream)
in (com.sun.mail.util.TraceInputStream)
in (java.io.BufferedInputStream)
bin (com.sun.mail.iap.ResponseInputStream)
input (com.sun.mail.imap.protocol.IMAPProtocol)
authenticatedConnections (com.sun.mail.imap.IMAPStore$ConnectionPool)
pool (com.sun.mail.imap.IMAPSSLStore)
store (com.sun.mail.imap.IMAPFolder)
folder (com.sun.mail.imap.IMAPMessage)
source (org.springframework.integration.mail.AbstractMailReceiver$IntegrationMimeMessage)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:139)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:147)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:120)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:77)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:442)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:392)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105)
    at org.springframework.integration.endpoint.SourcePollingChannelAdapter.handleMessage(SourcePollingChannelAdapter.java:161)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.doPoll(AbstractPollingEndpoint.java:251)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint.access$000(AbstractPollingEndpoint.java:57)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:176)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$1.call(AbstractPollingEndpoint.java:173)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller$1.run(AbstractPollingEndpoint.java:330)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor$1.run(ErrorHandlingTaskExecutor.java:55)
    at org.springframework.core.task.SyncTaskExecutor.execute(SyncTaskExecutor.java:50)
    at org.springframework.integration.util.ErrorHandlingTaskExecutor.execute(ErrorHandlingTaskExecutor.java:51)
    at org.springframework.integration.endpoint.AbstractPollingEndpoint$Poller.run(AbstractPollingEndpoint.java:324)
    at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
    at org.springframework.scheduling.concurrent.ReschedulingRunnable.run(ReschedulingRunnable.java:81)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
    at java.util.concurrent.FutureTask.run(FutureTask.java:262)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:745)
Caused by: com.esotericsoftware.kryo.KryoException: java.util.ConcurrentModificationException
Serialization trace:
classes (sun.misc.Launcher$AppClassLoader)
classloader (java.security.ProtectionDomain)
context (java.security.AccessControlContext)
acc (sun.security.ssl.SSLSocketImpl)
c (sun.security.ssl.AppInputStream)
in (com.sun.mail.util.TraceInputStream)
in (java.io.BufferedInputStream)
bin (com.sun.mail.iap.ResponseInputStream)
input (com.sun.mail.imap.protocol.IMAPProtocol)
authenticatedConnections (com.sun.mail.imap.IMAPStore$ConnectionPool)
pool (com.sun.mail.imap.IMAPSSLStore)
store (com.sun.mail.imap.IMAPFolder)
folder (com.sun.mail.imap.IMAPMessage)
source (org.springframework.integration.mail.AbstractMailReceiver$IntegrationMimeMessage)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:101)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:366)
    at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:307)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:606)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:87)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObjectOrNull(Kryo.java:606)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:87)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:628)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:100)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.write(FieldSerializer.java:518)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:534)
    at org.springframework.integration.codec.kryo.PojoCodec.doEncode(PojoCodec.java:92)
    at org.springframework.integration.codec.kryo.AbstractKryoCodec$2.execute(AbstractKryoCodec.java:66)
    at com.esotericsoftware.kryo.pool.KryoPoolQueueImpl.run(KryoPoolQueueImpl.java:61)
    at org.springframework.integration.codec.kryo.AbstractKryoCodec.encode(AbstractKryoCodec.java:63)
    at org.springframework.cloud.stream.binder.AbstractBinder.serializePayloadIfNecessary(AbstractBinder.java:230)
    at org.springframework.cloud.stream.binder.AbstractBinder.serializePayloadIfNecessary(AbstractBinder.java:210)
    at org.springframework.cloud.stream.binder.redis.RedisMessageChannelBinder.access$800(RedisMessageChannelBinder.java:71)
    at org.springframework.cloud.stream.binder.redis.RedisMessageChannelBinder$SendingHandler.handleMessageInternal(RedisMessageChannelBinder.java:295)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
    ... 28 more
Caused by: java.util.ConcurrentModificationException
    at java.util.Vector$Itr.checkForComodification(Vector.java:1156)
    at java.util.Vector$Itr.next(Vector.java:1133)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:92)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.write(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.writeObject(Kryo.java:552)
    at com.esotericsoftware.kryo.serializers.ObjectField.write(ObjectField.java:80)
    ... 84 more

2016-04-16 09:21:36.188  INFO 11640 --- [ask-scheduler-7] o.s.integration.mail.ImapMailReceiver    : attempting to receive mail from folder [INBOX]
2016-04-16 09:21:39.104  INFO 11640 --- [ask-scheduler-7] o.s.integration.mail.ImapMailReceiver    : attempting to receive mail from folder [INBOX]

Solution

  • It looks like we need a custom kryo serializer (used by Spring Cloud Stream) for MimeMessage. I opened a JIRA Issue against Spring Integration.

    However, when I tested it, I got a stack overflow error, not the exception you see.

    It would be helpful if you could attach the console output with the mail.debug javamail property set to true, with a message that exhibits your problem, to the JIRA issue so we can be sure we fix that problem too, because then we can reconstruct a similar MimeMessage.

    EDIT

    Initial analysis shows the problem is serializing a java.util.Logger within the session field of the mime message.

    Further, it turns out this object (AbstractMailReceiver.IntegrationMimeMessage) is not at all suitable for serialization because it has the entire application context in its scope since it's a nested class within the mail receiver.

    I therefore propose adding a transformer to the mail module to tranform this object into one that is suitable for any kind of serialization.

    EDIT2

    One workaround is to add .transform(mailToStringTransformer()) to the flow. The mail content will be the body of the message, the from, bcc, cc, to, replyTo and Subject will be added to headers.