Search code examples
ignite

Ignite : Evict data from Cache and store into Database


I want to evict data from cache for every minute, and store it into a third-party database.

I know about write though orations.

Is there any way to write a data into a third-party database by using an eviction policy?


Solution

  • Here is implemented code per @Dmitriy suggestion for catching the events on cache eviction:

    import org.apache.ignite.Ignite;
    import org.apache.ignite.IgniteCache;
    import org.apache.ignite.IgniteException;
    import org.apache.ignite.Ignition;
    import org.apache.ignite.cache.CacheMode;
    import org.apache.ignite.configuration.CacheConfiguration;
    import org.apache.ignite.configuration.IgniteConfiguration;
    import org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi;
    import org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder;
    
    import javax.cache.configuration.Factory;
    import javax.cache.configuration.MutableCacheEntryListenerConfiguration;
    import javax.cache.expiry.CreatedExpiryPolicy;
    import javax.cache.expiry.Duration;
    import java.util.Arrays;
    import java.util.concurrent.TimeUnit;
    
    public class CacheEvictExample {
    
        private static final String CACHE_NAME = CacheEvictExample.class.getSimpleName();
    
        public static void main(String[] args) throws IgniteException, InterruptedException {
            final IgniteConfiguration cfg = new IgniteConfiguration();
            cfg.setIgniteInstanceName("ignite-" + IgniteMain.class.getSimpleName());
            TcpDiscoverySpi discovery = new TcpDiscoverySpi();
            TcpDiscoveryMulticastIpFinder ipFinder = new TcpDiscoveryMulticastIpFinder();
            ipFinder.setAddresses(Arrays.asList("127.0.0.1:47500..47509"));
            discovery.setIpFinder(ipFinder);
            cfg.setDiscoverySpi(discovery);
            try (Ignite ignite = Ignition.start(cfg)) {
                System.out.println(">>> Cache events example started.");
                CacheConfiguration<Long, Person> personCacheCfg = new CacheConfiguration<>(CACHE_NAME);
                personCacheCfg.setCacheMode(CacheMode.PARTITIONED); // Default.
                personCacheCfg.setIndexedTypes(Long.class, Person.class);
                Factory expiryPolicyFactory = CreatedExpiryPolicy.factoryOf(new Duration(TimeUnit.SECONDS, 10));
                personCacheCfg.setExpiryPolicyFactory(expiryPolicyFactory);
    
                try (IgniteCache<Long, Person> cache = ignite.getOrCreateCache(personCacheCfg)) {
                    cache.registerCacheEntryListener(
                            new MutableCacheEntryListenerConfiguration<>
                              (
                                () -> new CacheEntryListenerHandler(),
                                () -> new CacheEntryEventFilterHandler(),
                               true, true
                              ));
    
                  cache.put(Long.valueOf(0), new Person(Long.valueOf(0), "Pappu", "Pyare", 2000, "Pappu Pass Ho Gaya"));
                  cache.put(Long.valueOf(1), new Person(Long.valueOf(1), "Pappu", "Pyare", 2000, "Pappu Dubara Pass Ho Gaya"));
    
                    //Wait for 11 seconds by the time cache will be expired
                    Thread.sleep(11000);
                    System.out.println("11 Seconds are Over. Pappu beta ooth ja!!! ---- Expired Event should trigger now");
                    Thread.sleep(2000);
                } finally {
                    ignite.destroyCache(CACHE_NAME);
                }
            }
        }
    }
    

    Listener

    import javax.cache.event.*;
    import java.io.Serializable;
    
    public class CacheEntryListenerHandler<K, V>
            implements CacheEntryExpiredListener<K, V>, Serializable {
        private static final long serialVersionUID = 1L;
    
        @Override
        public void onExpired(Iterable<CacheEntryEvent<? extends K, ? extends V>> events) throws CacheEntryListenerException {
            for (CacheEntryEvent<? extends K, ? extends V> event : events) {
                System.out.println("Received a " + event);
                /** Persist in third party database*/
            }
        }
    
    }
    

    EventFilter

    import java.io.Serializable;
    import java.util.Arrays;
    import java.util.List;
    import javax.cache.event.CacheEntryEvent;
    import javax.cache.event.CacheEntryEventFilter;
    import javax.cache.event.CacheEntryListenerException;
    import javax.cache.event.EventType;
    
    public class CacheEntryEventFilterHandler<K, V> implements CacheEntryEventFilter<K, V>, Serializable {
        private static final long serialVersionUID = 1L;
        final List<EventType> VALID_EVENTS = Arrays.asList(EventType.REMOVED, EventType.EXPIRED, EventType.CREATED);
    
        public boolean evaluate(CacheEntryEvent<? extends K, ? extends V> event)
                throws CacheEntryListenerException {
            boolean result = false;
            if (VALID_EVENTS.contains(event.getEventType())) {
                System.out.println("filter event=" + event + " filter result=" + result);
                result = true;
            }
            return result;
        }
    }
    

    Model Class:

    import org.apache.ignite.cache.query.annotations.QuerySqlField;
    import org.apache.ignite.cache.query.annotations.QueryTextField;
    
    import java.io.Serializable;
    
    /**
     * Person class.
     */
    
        public class Person implements Serializable {
    
            @QuerySqlField(index = true)
            public Long id;
    
    
            @QuerySqlField
            public String firstName;
    
            @QuerySqlField
            public String lastName;
    
            @QueryTextField
            public String resume;
    
            @QuerySqlField(index = true)
            public double salary;
    
    
            public Person() {
    
            }
    
            public Person(Long id, String firstName, String lastName, double salary, String resume) {
                this.id = id;
                this.firstName = firstName;
                this.lastName = lastName;
                this.salary = salary;
                this.resume = resume;
            }
    
            @Override
            public String toString() {
                return "Person [id=" + id +
                        ", lastName=" + lastName +
                        ", firstName=" + firstName +
                        ", salary=" + salary +
                        ", resume=" + resume + ']';
            }
        }
    

    Hope this will help.