Search code examples
javaignite

Ignite thread exception of "sun.misc.Unsafe.park" and stop handle request anymore


I have 2 ignite client(2 IDEA project running in one laptop) + 2 ignite server(2 linux vm)

when i try to send some request to get info from ignite cache, it dumps with bellow exceptions:

[02:45:24,915][WARNING][grid-timeout-worker-#23][diagnostic] Found long running cache future [startTime=02:44:12.514, curTime=02:45:24.904, fut=GridNearAtomicSingleUpdateFuture [reqState=Primary [id=d42484f4-0e66-4e59-8258-7b10c6c41695, opRes=false, expCnt=-1, rcvdCnt=0, primaryRes=false, done=false, waitFor=null, rcvd=null], super=GridNearAtomicAbstractUpdateFuture [remapCnt=100, topVer=AffinityTopologyVersion [topVer=4, minorTopVer=0], remapTopVer=null, err=null, futId=92, super=GridFutureAdapter [ignoreInterrupts=false, state=INIT, res=null, hash=1812798386]]]]
[02:45:25,004][WARNING][grid-timeout-worker-#23][G] >>> Possible starvation in striped pool.
    Thread name: sys-stripe-7-#8
    Queue: [Message closure [msg=GridIoMessage [plc=2, topic=TOPIC_CACHE, topicOrd=8, ordered=false, timeout=0, skipOnTimeout=false, msg=GridDhtAtomicSingleUpdateRequest [key=KeyCacheObjectImpl [part=239, val=null, hasValBytes=true], val=BinaryObjectImpl [arr= true, ctx=false, start=0], prevVal=null, super=GridDhtAtomicAbstractUpdateRequest [onRes=false, nearNodeId=null, nearFutId=0, flags=]]]], Message closure [msg=GridIoMessage [plc=2, topic=TOPIC_CACHE, topicOrd=8, ordered=false, timeout=0, skipOnTimeout=false, msg=GridNearAtomicUpdateResponse [nodeId=null, futId=92, errs=null, ret=GridCacheReturn [v=null, cacheObj=null, success=true, invokeRes=false, loc=true, cacheId=0], remapTopVer=null, nearUpdates=null, partId=239, mapping=null, nodeLeft=false, super=GridCacheIdMessage [cacheId=708740962]]]], Message closure [msg=GridIoMessage [plc=2, topic=TOPIC_CACHE, topicOrd=8, ordered=false, timeout=0, skipOnTimeout=false, msg=GridNearAtomicSingleUpdateRequest [key=KeyCacheObjectImpl [part=47, val=null, hasValBytes=true], super=GridNearAtomicSingleUpdateRequest [key=KeyCacheObjectImpl [part=47, val=null, hasValBytes=true], parent=GridNearAtomicAbstractSingleUpdateRequest [nodeId=null, futId=65569, topVer=AffinityTopologyVersion [topVer=4, minorTopVer=0], parent=GridNearAtomicAbstractUpdateRequest [res=null, flags=]]]]]], o.a.i.i.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$DeferredUpdateTimeout@3f4991e7, Message closure [msg=GridIoMessage [plc=2, topic=TOPIC_CACHE, topicOrd=8, ordered=false, timeout=0, skipOnTimeout=false, msg=GridNearSingleGetRequest [futId=1524335645779, key=KeyCacheObjectImpl [part=119, val=null, hasValBytes=true], flags=1, topVer=AffinityTopologyVersion [topVer=4, minorTopVer=0], subjId=de46e275-9054-45ed-8e00-6ab2643a5904, taskNameHash=0, createTtl=-1, accessTtl=-1]]], Message closure [msg=GridIoMessage [plc=2, topic=TOPIC_CACHE, topicOrd=8, ordered=false, timeout=0, skipOnTimeout=false, msg=GridNearAtomicSingleUpdateRequest [key=KeyCacheObjectImpl [part=47, val=null, hasValBytes=true], super=GridNearAtomicSingleUpdateRequest [key=KeyCacheObjectImpl [part=47, val=null, hasValBytes=true], parent=GridNearAtomicAbstractSingleUpdateRequest [nodeId=null, futId=16417, topVer=AffinityTopologyVersion [topVer=4, minorTopVer=0], parent=GridNearAtomicAbstractUpdateRequest [res=null, flags=]]]]]]]
    Deadlock: false
    Completed: 73
Thread [name="sys-stripe-7-#8", id=19, state=WAITING, blockCnt=0, waitCnt=108]
        at sun.misc.Unsafe.park(Native Method)
        at java.util.concurrent.locks.LockSupport.park(LockSupport.java:304)
        at o.a.i.i.util.future.GridFutureAdapter.get0(GridFutureAdapter.java:177)
        at o.a.i.i.util.future.GridFutureAdapter.get(GridFutureAdapter.java:140)
        at o.a.i.i.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.put0(GridDhtAtomicCache.java:613)
        at o.a.i.i.processors.cache.GridCacheAdapter.put(GridCacheAdapter.java:2369)
        at o.a.i.i.processors.cache.GridCacheAdapter.put(GridCacheAdapter.java:2346)
        at o.a.i.i.processors.cache.IgniteCacheProxyImpl.put(IgniteCacheProxyImpl.java:1026)
        at o.a.i.i.processors.cache.GatewayProtectedCacheProxy.put(GatewayProtectedCacheProxy.java:886)
        at com.test.match.MarketEventHandler.genReport(MarketEventHandler.java:67)
        at com.test.match.TestMatchBasicExecutor.matchCurrentLevel(TestMatchBasicExecutor.java:225)
        at com.test.match.TestMatchBasicExecutor$matchEntryProcessor.process(TestMatchBasicExecutor.java:469)
        at com.test.match.TestMatchBasicExecutor$matchEntryProcessor.process(TestMatchBasicExecutor.java:408)
        at o.a.i.i.processors.cache.GridCacheMapEntry$AtomicCacheUpdateClosure.runEntryProcessor(GridCacheMapEntry.java:5142)
        at o.a.i.i.processors.cache.GridCacheMapEntry$AtomicCacheUpdateClosure.call(GridCacheMapEntry.java:4550)
        at o.a.i.i.processors.cache.GridCacheMapEntry$AtomicCacheUpdateClosure.call(GridCacheMapEntry.java:4367)
        at o.a.i.i.processors.cache.persistence.tree.BPlusTree$Invoke.invokeClosure(BPlusTree.java:3051)
        at o.a.i.i.processors.cache.persistence.tree.BPlusTree$Invoke.access$6200(BPlusTree.java:2945)
        at o.a.i.i.processors.cache.persistence.tree.BPlusTree.invokeDown(BPlusTree.java:1717)
        at o.a.i.i.processors.cache.persistence.tree.BPlusTree.invoke(BPlusTree.java:1600)
        at o.a.i.i.processors.cache.IgniteCacheOffheapManagerImpl$CacheDataStoreImpl.invoke(IgniteCacheOffheapManagerImpl.java:1199)
        at o.a.i.i.processors.cache.IgniteCacheOffheapManagerImpl.invoke(IgniteCacheOffheapManagerImpl.java:345)
        at o.a.i.i.processors.cache.GridCacheMapEntry.innerUpdate(GridCacheMapEntry.java:1767)
        at o.a.i.i.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateSingle(GridDhtAtomicCache.java:2420)
        at o.a.i.i.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.update(GridDhtAtomicCache.java:1883)
        at o.a.i.i.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal0(GridDhtAtomicCache.java:1736)
        at o.a.i.i.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.updateAllAsyncInternal(GridDhtAtomicCache.java:1628)
        at o.a.i.i.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.processNearAtomicUpdateRequest(GridDhtAtomicCache.java:3055)
        at o.a.i.i.processors.cache.distributed.dht.atomic.GridDhtAtomicCache.access$400(GridDhtAtomicCache.java:130)
        at o.a.i.i.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$5.apply(GridDhtAtomicCache.java:266)
        at o.a.i.i.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$5.apply(GridDhtAtomicCache.java:261)
        at o.a.i.i.processors.cache.GridCacheIoManager.processMessage(GridCacheIoManager.java:1060)
        at o.a.i.i.processors.cache.GridCacheIoManager.onMessage0(GridCacheIoManager.java:579)
        at o.a.i.i.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:378)
        at o.a.i.i.processors.cache.GridCacheIoManager.handleMessage(GridCacheIoManager.java:304)
        at o.a.i.i.processors.cache.GridCacheIoManager.access$100(GridCacheIoManager.java:99)
        at o.a.i.i.processors.cache.GridCacheIoManager$1.onMessage(GridCacheIoManager.java:293)
        at o.a.i.i.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1555)
        at o.a.i.i.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:1183)
        at o.a.i.i.managers.communication.GridIoManager.access$4200(GridIoManager.java:126)
        at o.a.i.i.managers.communication.GridIoManager$9.run(GridIoManager.java:1090)
        at o.a.i.i.util.StripedExecutor$Stripe.run(StripedExecutor.java:505)
        at java.lang.Thread.run(Thread.java:748)

The code above exception mentioned is:

// TestMatchBasicExecutor.java:469
private static class matchEntryProcessor implements EntryProcessor<
    String, TestTradeCoinInfo, TestTradeCoinInfo> {

@Override
public TestTradeCoinInfo process(
        MutableEntry<String, TestTradeCoinInfo> entry, Object... args) {

    Ignite ignite = (Ignite) args[0];
    TestMatchEntrustInfo testMatchEntrustInfo = (TestMatchEntrustInfo) args[1];
    TestMatchEventHandler handler = (TestMatchEventHandler) args[2];

    TestTradeCoinInfo testTradeCoinInfo = entry.getValue();

...
    // TestMatchBasicExecutor.java:469
    if (false == matchTest(testMatchEntrustInfo, priceLevel, handler)) { 
        return null;
    }

...

    entry.setValue(testTradeCoinInfo);

    return null;
}
}



public static boolean matchTest(TestMatchEntrustInfo testInfo,
                                    MatchPriceLevel matchPriceLevel,
                                    TestMatchEventHandler eventHandler) {
...

eventHandler.genReport(testInfo,
        pendingInfo,
        amount,
        price);

...

}


public void genReport(TestMatchEntrustInfo primary, TestMatchEntrustInfo pending,
                  BigDecimal amount, BigDecimal price) {

String txCoinType = primary.getTxCoinType();

TestMatchResultRecord record = new TestMatchResultRecord();
record.genMatchResultRecord(primary, pending, amount, price);

// Add to another queue, and exit quickly to avoid dead lock
IgniteQueue<TestMatchResultRecord> reportQueue =
        ignite.queue("ResultRecordQueue_" + txCoinType, 0, null);
reportQueue.put(record);

}

I have use ignite EntryProcessor to process my data, previously in genReport it still has another cache use EntryProcessor, it always fails, so i use a ignite queue to save the record to queue and process the record in another service thread, but after the change, it give the abvoe exception, and refuse to process any other request anymore


Solution

  • You should not do a synchronous put() within the EntryProcessor. Processor is already invoked within a lock, so this can cause a deadlock and/or thread starvation.

    If you need to update the processed entry, use MutableEntry#setValue(..).