Search code examples
mongodbspring-bootstoreignite

Deleting element from Store after ExpirePolicy


Environment: I am running Apache Ignite v2.13.0 for the cache and the cache store is persisting to a Mongo DB v3.6.0. I am also utilizing Spring Boot (Java).

Question: When I have an expiration policy set, how do I remove the corresponding data from my persistent database?

What I have attempted: I have attempted to utilize CacheEntryExpiredListener but my print statement is not getting triggered. Is this the proper way to solve the problem?

Here is a sample bit of code:

@Service
public class CacheRemovalListener implements CacheEntryExpiredListener<Long, Metrics> {

    @Override
    public void onExpired(Iterable<CacheEntryEvent<? extends Long, ? extends Metrics>> events) throws CacheEntryListenerException {
        for (CacheEntryEvent<? extends Long, ? extends Metrics> event : events) {
            System.out.println("Received a " + event);
        }

    }

}


Solution

  • Use Continuous Query to get notifications about Ignite data changes.

    ExecutorService mongoUpdateExecutor = Executors.newSingleThreadExecutor();
    
    CacheEntryUpdatedListener<Integer, Integer> lsnr = new CacheEntryUpdatedListener<Integer, Integer>() {
        @Override
        public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends Integer>> evts) {
            for (CacheEntryEvent<?, ?> e : evts) {
                if (e.getEventType() == EventType.EXPIRED) {
                    // Use separate executor to avoid blocking Ignite threads
                    mongoUpdateExecutor.submit(() -> removeFromMongo(e.getKey()));
                }
            }
        }
    };
    
    
    var qry = new ContinuousQuery<Integer, Integer>()
            .setLocalListener(lsnr)
            .setIncludeExpired(true);
    
    // Start receiving updates.
    var cursor = cache.query(qry);
    
    // Stop receiving updates.
    cursor.close();
    
    

    Note 1: EXPIRED events should be enabled explicitly with ContinuousQuery#setIncludeExpired.

    Note 2: Query listeners should not perform any heavy/blocking operations. Offload that work to a separate thread/executor.