Search code examples
spring-boothibernateinfinispanjgroupssecond-level-cache

How to track down sudden traffic spikes in infinispan NAKACK protocol?


We are trying to move a cluster of 30+ servers from ehcache 2.x to infinispan 13.x. We use Spring Boot/Hibernate etc. Pretty common setup. We used EhCache 2.x with UDP JGroups Replication before and never had any problems.

This is our infinispan config. Pretty default. What we have changed:

  • We removed from the default config all query/timestamp caches as we don't use any query cache.

  • We use "udp" instead of tcp

  • mode="ASYNC" as we prefer speed over consistency

  • reduced the lock timeout

      <transport stack="hibernate-jgroups" cluster="infinispan-hibernate-cluster"/>
    
      <local-cache-configuration name="the-default-cache" statistics="false" />
    
      <invalidation-cache-configuration name="entity" mode="ASYNC" remote-timeout="20000" statistics="true" statistics-available="true">
          <encoding media-type="application/x-java-object"/>
          <locking concurrency-level="1000" acquire-timeout="1000"/>
          <transaction mode="NONE" />
          <expiration max-idle="-1" interval="-1" />
          <memory max-count="2000000"/>
      </invalidation-cache-configuration>
    
      <local-cache-configuration name="pending-puts" simple-cache="true" statistics="true" statistics-available="true">
          <encoding media-type="application/x-java-object"/>
          <transaction mode="NONE"/>
          <expiration max-idle="60000" />
      </local-cache-configuration>
    

The transport stack file is the default from the infinispan-hibernate library:

<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups-4.2.xsd">
    <!-- jgroups.udp.address is deprecated and will be removed, see ISPN-11867 -->
    <UDP bind_addr="${jgroups.bind.address,jgroups.udp.address:SITE_LOCAL}"
         bind_port="${jgroups.bind.port,jgroups.udp.port:0}"
         mcast_addr="${jgroups.mcast_addr:228.6.7.8}"
         mcast_port="${jgroups.mcast_port:46655}"
         tos="0"
         ucast_send_buf_size="1m"
         mcast_send_buf_size="1m"
         ucast_recv_buf_size="20m"
         mcast_recv_buf_size="25m"
         ip_ttl="${jgroups.ip_ttl:2}"
         thread_naming_pattern="pl"
         enable_diagnostics="false"
         bundler_type="transfer-queue"
         max_bundle_size="8500"

         thread_pool.min_threads="${jgroups.thread_pool.min_threads:0}"
         thread_pool.max_threads="${jgroups.thread_pool.max_threads:200}"
         thread_pool.keep_alive_time="60000"

         thread_dumps_threshold="${jgroups.thread_dumps_threshold:10000}"
    />
    <PING num_discovery_runs="3"/>
    <MERGE3 min_interval="10000"
            max_interval="30000"
    />
    <FD_SOCK/>
    <!-- Suspect node `timeout` to `timeout + timeout_check_interval` millis after the last heartbeat -->
    <FD_ALL timeout="10000"
            interval="2000"
            timeout_check_interval="1000"
    />
    <VERIFY_SUSPECT timeout="1000"/>
    <pbcast.NAKACK2 xmit_interval="100"
                    xmit_table_num_rows="50"
                    xmit_table_msgs_per_row="1024"
                    xmit_table_max_compaction_time="30000"
                    resend_last_seqno="true"
    />
    <UNICAST3 xmit_interval="100"
              xmit_table_num_rows="50"
              xmit_table_msgs_per_row="1024"
              xmit_table_max_compaction_time="30000"
    />
    <pbcast.STABLE desired_avg_gossip="5000"
                   max_bytes="1M"
    />
    <pbcast.GMS print_local_addr="false"
                join_timeout="${jgroups.join_timeout:2000}"
    />
    <UFC max_credits="4m"
         min_threshold="0.40"
    />
    <MFC max_credits="4m"
         min_threshold="0.40"
    />
    <FRAG3 frag_size="8000"/>
</config>

Additionally we have some other caches beside hibernate 2LC which are configured like this:

embeddedCacheManager.defineConfiguration(NAME, new ConfigurationBuilder()
  .memory().maxCount(10000).whenFull(EvictionStrategy.REMOVE)
  .statistics().enabled(true)
  .expiration().maxIdle(-1).lifespan(-1)
  .locking().concurrencyLevel(1000).lockAcquisitionTimeout(1000, TimeUnit.MILLISECONDS)
  .clustering().cacheMode(CacheMode.INVALIDATION_ASYNC)
  .build()
);

We have about 80 hibernate 2L Caches and 80 other caches. Each cache can have up to 2.000.000 entries. As we are a read-heavy application, we need this massive caching.

So far everything is working fine. But from time to time we get some sudden burst in network traffic which saturates a 10GBit Link. The LAN is only internal to our cluster with a dedicated switch.

This usually happens when many cache entries are removed. Example: we have a job which calculates some results and puts them into the cache. This happens for 2 Million users over a period of a few minutes.

The job starts at :15 and runs normal. The raise in traffic is normal. Then at 21:17 it suddenly goes crazy. Each member of the cluster is suddenly sending 4 Gb/s, so the network becomes saturated. The job is running on each server. But each server is only responsible for a small part of data. All servers start at the same time sending large amount of data.

network

Later on at 21:19 (two minutes later) we have a thread dump message because all jgrous threads are full. We can see our Job is still running but waiting:

Full dump https://controlc.com/09bcd24c

Important part:

"pool-15-thread-1" #263 prio=0 tid=0x107 nid=NA timed_waiting
   java.lang.Thread.State: TIMED_WAITING
    at [email protected]/jdk.internal.misc.Unsafe.park(Native Method)
    - parking to wait for <0x37bfabdf> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
    at [email protected]/java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:252)
    at [email protected]/java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:1757)
    at org.jgroups.util.CreditMap.decrement(CreditMap.java:155)
    at org.jgroups.protocols.MFC.handleDownMessage(MFC.java:119)
    at org.jgroups.protocols.FlowControl.down(FlowControl.java:323)
    at org.jgroups.protocols.FRAG3.down(FRAG3.java:139)
    at org.jgroups.stack.ProtocolStack.down(ProtocolStack.java:927)
    at org.jgroups.JChannel.down(JChannel.java:645)
    at org.jgroups.JChannel.send(JChannel.java:484)
    at org.infinispan.remoting.transport.jgroups.JGroupsTransport.send(JGroupsTransport.java:1161)
    at org.infinispan.remoting.transport.jgroups.JGroupsTransport.sendCommandToAll(JGroupsTransport.java:1275)
    at org.infinispan.remoting.transport.jgroups.JGroupsTransport.sendToAll(JGroupsTransport.java:1265)
    at org.infinispan.remoting.rpc.RpcManagerImpl.sendToAll(RpcManagerImpl.java:402)
    at org.infinispan.hibernate.cache.commons.access.NonTxInvalidationInterceptor.invalidateAcrossCluster(NonTxInvalidationInterceptor.java:116)
    at org.infinispan.hibernate.cache.commons.access.NonTxInvalidationInterceptor.handleWriteReturn(NonTxInvalidationInterceptor.java:129)
    at org.infinispan.hibernate.cache.commons.access.NonTxInvalidationInterceptor$$Lambda$1598/0x000000080181c508.apply(Unknown Source)
    at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNextThenApply(BaseAsyncInterceptor.java:86)
    at org.infinispan.hibernate.cache.commons.access.NonTxInvalidationInterceptor.visitRemoveCommand(NonTxInvalidationInterceptor.java:72)
    at org.infinispan.commands.write.RemoveCommand.acceptVisitor(RemoveCommand.java:53)
    at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNext(BaseAsyncInterceptor.java:59)
    at org.infinispan.interceptors.BaseAsyncInterceptor.asyncInvokeNext(BaseAsyncInterceptor.java:232)
    at org.infinispan.interceptors.impl.EntryWrappingInterceptor.setSkipRemoteGetsAndInvokeNextForDataCommand(EntryWrappingInterceptor.java:784)
    at org.infinispan.interceptors.impl.EntryWrappingInterceptor.visitRemoveCommand(EntryWrappingInterceptor.java:409)
    at org.infinispan.commands.write.RemoveCommand.acceptVisitor(RemoveCommand.java:53)
    at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNext(BaseAsyncInterceptor.java:59)
    at org.infinispan.interceptors.DDAsyncInterceptor.handleDefault(DDAsyncInterceptor.java:55)
    at org.infinispan.interceptors.DDAsyncInterceptor.visitRemoveCommand(DDAsyncInterceptor.java:66)
    at org.infinispan.commands.write.RemoveCommand.acceptVisitor(RemoveCommand.java:53)
    at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNextAndHandle(BaseAsyncInterceptor.java:188)
    at org.infinispan.hibernate.cache.commons.access.LockingInterceptor.lambda$new$1(LockingInterceptor.java:74)
    at org.infinispan.hibernate.cache.commons.access.LockingInterceptor$$Lambda$1601/0x000000080181d090.apply(Unknown Source)
    at org.infinispan.interceptors.SyncInvocationStage.andHandle(SyncInvocationStage.java:69)
    at org.infinispan.hibernate.cache.commons.access.LockingInterceptor.visitDataWriteCommand(LockingInterceptor.java:91)
    at org.infinispan.interceptors.locking.AbstractLockingInterceptor.visitRemoveCommand(AbstractLockingInterceptor.java:102)
    at org.infinispan.commands.write.RemoveCommand.acceptVisitor(RemoveCommand.java:53)
    at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNextAndFinally(BaseAsyncInterceptor.java:155)
    at org.infinispan.interceptors.impl.CacheMgmtInterceptor.visitRemoveCommand(CacheMgmtInterceptor.java:507)
    at org.infinispan.commands.write.RemoveCommand.acceptVisitor(RemoveCommand.java:53)
    at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNext(BaseAsyncInterceptor.java:59)
    at org.infinispan.interceptors.DDAsyncInterceptor.handleDefault(DDAsyncInterceptor.java:55)
    at org.infinispan.interceptors.DDAsyncInterceptor.visitRemoveCommand(DDAsyncInterceptor.java:66)
    at org.infinispan.commands.write.RemoveCommand.acceptVisitor(RemoveCommand.java:53)
    at org.infinispan.interceptors.BaseAsyncInterceptor.invokeNextAndExceptionally(BaseAsyncInterceptor.java:128)
    at org.infinispan.interceptors.impl.InvocationContextInterceptor.visitCommand(InvocationContextInterceptor.java:89)
    at org.infinispan.interceptors.impl.AsyncInterceptorChainImpl.invokeAsync(AsyncInterceptorChainImpl.java:220)
    at org.infinispan.hibernate.cache.v53.impl.NonTxInvalidationCacheAccessDelegate.invoke(NonTxInvalidationCacheAccessDelegate.java:48)
    at org.infinispan.hibernate.cache.commons.access.NonTxInvalidationCacheAccessDelegate.write(NonTxInvalidationCacheAccessDelegate.java:100)
    at org.infinispan.hibernate.cache.commons.access.NonTxInvalidationCacheAccessDelegate.update(NonTxInvalidationCacheAccessDelegate.java:67)
    at org.infinispan.hibernate.cache.v53.impl.ReadWriteEntityDataAccess.update(ReadWriteEntityDataAccess.java:16)
    at org.hibernate.action.internal.EntityUpdateAction.cacheUpdate(EntityUpdateAction.java:295)
    at org.hibernate.action.internal.EntityUpdateAction.execute(EntityUpdateAction.java:265)
    at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:604)
    at org.hibernate.engine.spi.ActionQueue.lambda$executeActions$1(ActionQueue.java:478)
    at org.hibernate.engine.spi.ActionQueue$$Lambda$2832/0x000000080214cce0.accept(Unknown Source)
    at [email protected]/java.util.LinkedHashMap.forEach(LinkedHashMap.java:721)
    at org.hibernate.engine.spi.ActionQueue.executeActions(ActionQueue.java:475)
    at org.hibernate.event.internal.AbstractFlushingEventListener.performExecutions(AbstractFlushingEventListener.java:344)
    at org.hibernate.event.internal.DefaultFlushEventListener.onFlush(DefaultFlushEventListener.java:40)
    at org.hibernate.internal.SessionImpl$$Lambda$2711/0x0000000802027788.accept(Unknown Source)
    at org.hibernate.event.service.internal.EventListenerGroupImpl.fireEventOnEachListener(EventListenerGroupImpl.java:107)
    at org.hibernate.internal.SessionImpl.doFlush(SessionImpl.java:1407)
    at org.hibernate.internal.SessionImpl.managedFlush(SessionImpl.java:489)
    at org.hibernate.internal.SessionImpl.flushBeforeTransactionCompletion(SessionImpl.java:3303)
    at org.hibernate.internal.SessionImpl.beforeTransactionCompletion(SessionImpl.java:2438)
    at org.hibernate.engine.jdbc.internal.JdbcCoordinatorImpl.beforeTransactionCompletion(JdbcCoordinatorImpl.java:449)
    at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.beforeCompletionCallback(JdbcResourceLocalTransactionCoordinatorImpl.java:183)
    at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl.access$300(JdbcResourceLocalTransactionCoordinatorImpl.java:40)
    at org.hibernate.resource.transaction.backend.jdbc.internal.JdbcResourceLocalTransactionCoordinatorImpl$TransactionDriverControlImpl.commit(JdbcResourceLocalTransactionCoordinatorImpl.java:281)
    at org.hibernate.engine.transaction.internal.TransactionImpl.commit(TransactionImpl.java:101)
    at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:562)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:743)
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:711)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.commitTransactionAfterReturning(TransactionAspectSupport.java:654)
    at org.springframework.transaction.interceptor.TransactionAspectSupport.invokeWithinTransaction(TransactionAspectSupport.java:407)
    at org.springframework.transaction.aspectj.AbstractTransactionAspect.ajc$around$org_springframework_transaction_aspectj_AbstractTransactionAspect$1$2a73e96c(AbstractTransactionAspect.aj:71)
    at de.kicktipp.core.ranking.updatejob.RankingUpdateServiceGruppenwertung.beendeJob(RankingUpdateServiceGruppenwertung.java:67)
    at de.kicktipp.core.ranking.updatejob.RankingUpdateJobGruppenwertung.runUpdate(RankingUpdateJobGruppenwertung.java:98)
    at de.kicktipp.core.ranking.updatejob.RankingUpdateJobGruppenwertung.doWork(RankingUpdateJobGruppenwertung.java:77)
    at de.kicktipp.core.cron.utils.Job.startJob(Job.java:184)
    at de.kicktipp.core.cron.utils.Job.run(Job.java:157)
    at [email protected]/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at [email protected]/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at [email protected]/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at [email protected]/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at [email protected]/java.lang.Thread.run(Thread.java:833)


   Locked ownable synchronizers:
    - <0x422ca9ec> (a java.util.concurrent.ThreadPoolExecutor$Worker)

I guess the job is waiting for some remote UDP package. Actually something is going wrong. But I think the full thread pool is not the cause, it's just a consequence. So it wouldn't help to set the maximum pool size to 400 instead of 200

When this happens we have a sudden burst of jgroups xmit messages (160.000 messages per second) from the NAKACK protocol. So something is suddenly retransmitting massive packages.

If have seen this post and it seems to be pretty similar. But we do not observe any errors at the time the network spike starts. And it seems strange to me that switching from udp to tcp should help.


Update 2022-02-25

Because of this post I tried the standard tcp stack file from infinispan and everything works fine now.

<config xmlns="urn:org:jgroups"
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:schemaLocation="urn:org:jgroups http://www.jgroups.org/schema/jgroups-4.2.xsd">
   <!-- jgroups.tcp.address is deprecated and will be removed, see ISPN-11867 -->
   <TCP bind_addr="${jgroups.bind.address,jgroups.tcp.address:SITE_LOCAL}"
        bind_port="${jgroups.bind.port,jgroups.tcp.port:7800}"
        enable_diagnostics="false"
        thread_naming_pattern="pl"
        send_buf_size="640k"
        sock_conn_timeout="300"
        bundler_type="transfer-queue"

        thread_pool.min_threads="${jgroups.thread_pool.min_threads:0}"
        thread_pool.max_threads="${jgroups.thread_pool.max_threads:200}"
        thread_pool.keep_alive_time="60000"

        thread_dumps_threshold="${jgroups.thread_dumps_threshold:10000}"
   />
   <MPING mcast_addr="${jgroups.mcast_addr:228.6.7.8}"
          mcast_port="${jgroups.mcast_port:46655}"
          num_discovery_runs="3"
          ip_ttl="${jgroups.udp.ip_ttl:2}"
   />
   <MERGE3 min_interval="10000"
           max_interval="30000"
   />
   <FD_SOCK/>
   <!-- Suspect node `timeout` to `timeout + timeout_check_interval` millis after the last heartbeat -->
   <FD_ALL timeout="10000"
           interval="2000"
           timeout_check_interval="1000"
   />
   <VERIFY_SUSPECT timeout="1000"/>
   <pbcast.NAKACK2 use_mcast_xmit="false"
                   xmit_interval="200"
                   xmit_table_num_rows="50"
                   xmit_table_msgs_per_row="1024"
                   xmit_table_max_compaction_time="30000"
                   resend_last_seqno="true"
   />
   <UNICAST3 conn_close_timeout="5000"
             xmit_interval="200"
             xmit_table_num_rows="50"
             xmit_table_msgs_per_row="1024"
             xmit_table_max_compaction_time="30000"
   />
   <pbcast.STABLE desired_avg_gossip="5000"
                  max_bytes="1M"
   />
   <pbcast.GMS print_local_addr="false"
               join_timeout="${jgroups.join_timeout:2000}"
   />
   <UFC max_credits="4m"
        min_threshold="0.40"
   />
   <MFC max_credits="4m"
        min_threshold="0.40"
   />
   <FRAG3/>
</config>

How can I track down the reason for this to fix our program? How can I better analyze what is really happening?


Solution

  • Which JGroups configuration are you using? If you use a small value for xmit_interval in NAKACK2 (e.g. 100 (ms)), then the following can happen:

    • All members transmit some big message (stack trace, if I understand your example correctly)
    • If a member doesn't receive this in time, it will ask for retransmission
    • The retransmission requests swamp the nodes (retransmission storm); all threads busy with retransmission are blocked on TransferQueueBundler.send() (full send-queue).
    • As a result, the thread pool is full and no more requests can be accepted

    If my assumption is correct, then this can be fixed by either

    • adding RED above the transport (UDP): this will drop requests when the send-queue is almost full
    • use NoBundler (bundler_type="no-bundler" in UDP). Not really recommended, as this will give you worse perf than transfer-queue.

    If this is reproduceable, I suggest try the recommendations (not both at the same time!), to see if this fixes the issue. Cheers,