Search code examples
javaapache-sparkcachingspring-dataignite

Apache Ignite unexpectedly deletes IgniteSet


I am facing the problem where my Ignite repository instance unexpectedly closes the opened Ignite set after attempt to save it in map or pass as return value from function.

So I have Java Spring application where Ignite is used under the hood of Spring Data (master) and Spark application where the same Ignite is used as DB (client). In this case the set is created and filled in Spark application, and in Java app I just want to access it and check set.contains(element).

On the first part everything looks good - set is created, I can see in logs that its size is correct:

def save(host: String, cacheName: String): Unit = {
    val ignite: Ignite = igniteClientNode(host)
    val igniteSetCache: IgniteSet[String] = createIgniteSetCache(ignite, cacheName)
    igniteSetCache.clear()

    instance.fittedUsers.collect().foreach { row =>
      igniteSetCache.add(row.mkString(","))
    }

    logger.debug("Size of IgniteSet: " + igniteSetCache.size()) // DEBUG: Size of IgniteSet: 7910
  }

In Java app I have corresponding Ignite bean and I try to access the created cache and to save it to map:

private IgniteSet<String> getSetByModelTag(String modelTag) {
    LOGGER.warning("HERE in getSetByModelTag " + openedIgniteSets); // instance wide map
    IgniteSet<String> alreadyOpenedSet = openedIgniteSets.getOrDefault(modelTag, null);

    if (alreadyOpenedSet == null) {
        try (IgniteSet<String> newSet = igniteInstance.set(modelTag, new CollectionConfiguration())) {
            if (newSet != null) {
                alreadyOpenedSet = newSet;
                openedIgniteSets.put(modelTag, alreadyOpenedSet);
                LOGGER.warning("Number of users in opened set for modelTag=`" +
                        modelTag + "` is " + alreadyOpenedSet.size());
                LOGGER.warning("HERE in if " + openedIgniteSets);
            } else {
                throw new IgniteException("`set()` method in Ignite component returned null.");
            }
        } catch (IgniteException e) {
            LOGGER.log(Level.SEVERE, "Ignite exception", e);
            throw e;
        }
    }

    return alreadyOpenedSet;
}

And later in code I use this set to check whether it contains some element:

// in the bean component
private final Ignite igniteInstance;
private final HashMap<String, IgniteSet<String>> openedIgniteSets = new HashMap<>();

...
var setWithFittedUsers = getSetByModelTag(modelTag);

LOGGER.warning("HERE in processModelTag " + openedIgniteSets);
LOGGER.warning("Number of users in setWithFittedUsers is " + setWithFittedUsers.size());
if (setWithFittedUsers.contains(user)) {
    // do something;
}

On the line of .contains() I have this error:

Request processing failed; nested exception is java.lang.IllegalStateException: Set has been removed from cache: GridCacheSetImpl [cache=GridDhtAtomicCache [defRes=org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$1@16b71d74, lockedEntriesInfo=org.apache.ignite.internal.processors.cache.LockedEntriesInfo@73230dca, near=null, super=GridDhtCacheAdapter [stopping=true, super=GridDistributedCacheAdapter [super=GridCacheAdapter [locMxBean=org.apache.ignite.internal.processors.cache.CacheLocalMetricsMXBeanImpl@59581355, clusterMxBean=org.apache.ignite.internal.processors.cache.CacheClusterMetricsMXBeanImpl@712fa2e8, aff=org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl@f53c3e7, asyncOpsSem=java.util.concurrent.Semaphore@35f2445b[Permits = 500], partPreloadBadVerWarned=false, active=true, name=datastructures_ATOMIC_PARTITIONED_0@default-ds-group#SET_als_on_all_data, size=0]]]], name=als_on_all_data, id=58c36079e71-40ec3dd2-ada4-4e1e-8b0c-15217439ad5d, collocated=false, separated=true, hdrPart=272, setKey=GridCacheSetHeaderKey [name=als_on_all_data], rmvd=true, compute=org.apache.ignite.internal.IgniteComputeImpl@597bcf59]

Which quite confusing as in logs I see that in function getSetByModelTag() set is retrieved and its size matches the expected. But after exiting the function Ignite says that it stopped the cache and of course after this I can't check anything and its size becomes 0 :(

Logs:

// everything looks as expected
2022-01-26 15:35:26,701 [http-nio-8000-exec-7] [ClientComponentImpl] WARN : HERE in getSetByModelTag {}
2022-01-26 15:35:26,745 [http-nio-8000-exec-7] [ClientComponentImpl] WARN : Number of users in opened set for modelTag=`als_on_all_data` is 7910
2022-01-26 15:35:26,747 [http-nio-8000-exec-7] [ClientComponentImpl] WARN : HERE in if {als_on_all_data=GridCacheSetImpl [cache=GridDhtAtomicCache [defRes=org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$1@16b71d74, lockedEntriesInfo=org.apache.ignite.internal.processors.cache.LockedEntriesInfo@73230dca, near=null, super=GridDhtCacheAdapter [stopping=false, super=GridDistributedCacheAdapter [super=GridCacheAdapter [locMxBean=org.apache.ignite.internal.processors.cache.CacheLocalMetricsMXBeanImpl@59581355, clusterMxBean=org.apache.ignite.internal.processors.cache.CacheClusterMetricsMXBeanImpl@712fa2e8, aff=org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl@f53c3e7, asyncOpsSem=java.util.concurrent.Semaphore@35f2445b[Permits = 500], partPreloadBadVerWarned=false, active=true, name=datastructures_ATOMIC_PARTITIONED_0@default-ds-group#SET_als_on_all_data, size=7910]]]], name=als_on_all_data, id=58c36079e71-40ec3dd2-ada4-4e1e-8b0c-15217439ad5d, collocated=false, separated=true, hdrPart=272, setKey=GridCacheSetHeaderKey [name=als_on_all_data], rmvd=false, compute=org.apache.ignite.internal.IgniteComputeImpl@597bcf59]}

// now after exiting the function Ignite stops it
2022-01-26 15:35:28,394 [exchange-worker-#66] [org.apache.ignite.logger.java.JavaLogger] INFO : Stopped cache [cacheName=datastructures_ATOMIC_PARTITIONED_0@default-ds-group#SET_als_on_all_data, group=default-ds-group]

// and now its size is 0
2022-01-26 15:35:28,404 [http-nio-8000-exec-7] [ClientComponentImpl] WARN : HERE in processModelTag {als_on_all_data=GridCacheSetImpl [cache=GridDhtAtomicCache [defRes=org.apache.ignite.internal.processors.cache.distributed.dht.atomic.GridDhtAtomicCache$1@16b71d74, lockedEntriesInfo=org.apache.ignite.internal.processors.cache.LockedEntriesInfo@73230dca, near=null, super=GridDhtCacheAdapter [stopping=true, super=GridDistributedCacheAdapter [super=GridCacheAdapter [locMxBean=org.apache.ignite.internal.processors.cache.CacheLocalMetricsMXBeanImpl@59581355, clusterMxBean=org.apache.ignite.internal.processors.cache.CacheClusterMetricsMXBeanImpl@712fa2e8, aff=org.apache.ignite.internal.processors.cache.affinity.GridCacheAffinityImpl@f53c3e7, asyncOpsSem=java.util.concurrent.Semaphore@35f2445b[Permits = 500], partPreloadBadVerWarned=false, active=true, name=datastructures_ATOMIC_PARTITIONED_0@default-ds-group#SET_als_on_all_data, size=0]]]], name=als_on_all_data, id=58c36079e71-40ec3dd2-ada4-4e1e-8b0c-15217439ad5d, collocated=false, separated=true, hdrPart=272, setKey=GridCacheSetHeaderKey [name=als_on_all_data], rmvd=true, compute=org.apache.ignite.internal.IgniteComputeImpl@597bcf59]}

Nowhere I have ignite.remove() or ignite.destroy() :( Also the client node that created and filled the cache is not destroyed either (in Spark app). Master node is working as well (in Java app).

Why do I need this in separate function:

The set is connected each time app has a request to process. So in case of >1000 RPS the line igniteInstance.set(modelTag, new CollectionConfiguration()) throws NPO occasionally (in 30% requests). So I decided to open set once, store it in map by set name key and access it each time I need to use it.

So my guess:

  • this is an expected behavior which is weird;
  • something causes set deletion which I don't know how to debug;
  • something else.

Please help to figure this out!


Solution

  • So finally after hours of debug I found the reason and solution.

    First of all, I debugged size of the set each time I open it. And weirdly after first call its size becomes 0, so set is erased after the first call to ignite.set(). After this I switched to plain cache (instead of set) and just check cache.containsKey(user). Its size was persistent among the getOrCreateCache() calls but the NPO problem was still raised.

    Then I found this tiny little answer on Ignite mailing list where it's said that Ignite caches implement AutoCloseable interface. Which means that after try-except block cache.close() is automatically called. And it means not just close the "connection" to cache but stopping the cache itself.

    After this I changed my code to this:

    IgniteCache<String, String> cache = igniteInstance.getOrCreateCache(configuration);
    if (cache != null) {
        if (cache.containsKey(user)) {
            finalModelTag = modelTag;
        }
    } else {
        throw new CacheException("`getOrCreateCache()` method in Ignite component returned null.");
    }
    

    Also I noticed in Ignite logs that partition exchange process (PME) starts all the time with default cache group. And during PME the caches (and sets) are stopped. That was my possible reason for NPO. I started to put caches in another group and PME processes were not triggered during app work:

    val cacheConfiguration = new CacheConfiguration[String, String]()
    cacheConfiguration.setBackups(2)
    cacheConfiguration.setGroupName("some-group-name")
    cacheConfiguration.setName(cacheName)
    

    Don't know exactly what helped to solve initial problem but now everything works as expected. It's sad that there is no possibility to catch exceptions during cache creation, I didn't figure it out - how to not trigger auto cache.close().