Search code examples
javaignite

Apache Ignite event listener registration error


On our production cluster we are seeing that some events are not getting inserted or updated. There is no error reported from applications.

To investigate this I want to listen cache put events from the Ignite cluster. I followed what was mentioned here https://ignite.apache.org/docs/latest/events/listening-to-events. Also took reference from this example here. https://github.com/apache/ignite/blob/master/examples/src/main/java/org/apache/ignite/examples/datagrid/CacheEventsExample.java

I tried running exact same code as mentioned in example link. It is like below

try (Ignite ignite = Ignition.start("C:\\Users\\example\\ignite-client-config-local.xml")) {
        System.out.println();
        System.out.println(">>> Cache events example started.");

        EntityKey entityKey = new EntityKey("T123");

        Entity entity = new Entity(entityKey);
        entity.setProductType("prodct1");
        entity.setSubProductType("subproduct1");

        try (IgniteCache<EntityKey,Entity> cache = ignite.getOrCreateCache("ProductsCache")) {

            IgniteBiPredicate<UUID, CacheEvent> locLsnr = new IgniteBiPredicate<UUID, CacheEvent>() {
                @Override public boolean apply(UUID uuid, CacheEvent evt) {
                    System.out.println("Received event [evt=" + evt.name() + ", key=" + evt.key() +
                            ", oldVal=" + evt.oldValue() + ", newVal=" + evt.newValue());

                    return true; // Continue listening.
                }
            };


            IgnitePredicate<CacheEvent> rmtLsnr = new IgnitePredicate<CacheEvent>() {
                @Override public boolean apply(CacheEvent evt) {
                    System.out.println("Cache event [name=" + evt.name() + ", key=" + evt.key() + ']');
                    ignite.affinity("ProductsCache").isPrimary(ignite.cluster().localNode(), key);
                    return true;
                }
            };

           
            ignite.events(ignite.cluster().forCacheNodes("ProductsCache")).remoteListen(locLsnr, rmtLsnr,
                    EVT_CACHE_OBJECT_PUT);

            // Generate cache events.
            cache.put(entityKey,entity);

            // Wait for a while while callback is notified about remaining puts.
            Thread.sleep(2000);
            System.out.println(cache.get(entityKey));
        }
        }

But listener functions don't work. Data gets pushed to the cache properly and gets read back too. This I am first trying on my local machine. I have also set peerClassLoadingEnabled to true in both server and client config xml. Also in both configs I have added CACHE_EVT_OBJECT_PUT. On Ignite server I can see below error.

[16:56:07,469][SEVERE][sys-#109%personal.local%][GridContinuousProcessor] Failed to unmarshal continuous routine handler [routineId=1c49245f-238c-49a0-ae48-c4da3085052a, srcNodeId=89b3117f-43aa-428d-a5cf-647a518d089e]
class org.apache.ignite.internal.IgniteDeploymentCheckedException: Failed to obtain deployment for class: com.igniteadmin.LocalIgniteEventFilter
    at org.apache.ignite.internal.GridEventConsumeHandler.p2pUnmarshal(GridEventConsumeHandler.java:418)
    at org.apache.ignite.internal.processors.continuous.GridContinuousProcessor.lambda$null$2(GridContinuousProcessor.java:707)
    at org.apache.ignite.internal.util.IgniteUtils.wrapThreadLoader(IgniteUtils.java:7133)
    at org.apache.ignite.internal.processors.closure.GridClosureProcessor$1.body(GridClosureProcessor.java:827)
    at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:120)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:834)
[16:56:07,481][SEVERE][sys-#108%personal.local%][] JVM will be halted immediately due to the failure: [failureCtx=FailureContext [type=CRITICAL_ERROR, err=class o.a.i.i.IgniteDeploymentCheckedException: Failed to obtain deployment for class: com.igniteadmin.LocalIgniteEventFilter]]

Solution

  • Try to inject ignite instance via @IgniteInstanceResource in order to use it in your remote filter.

    IgnitePredicate<CacheEvent> rmtLsnr = new IgnitePredicate<CacheEvent>() {
        @IgniteInstanceResource
        private IgniteEx ignite;                
    
        @Override public boolean apply(CacheEvent evt) {
            System.out.println("Cache event [name=" + evt.name() + ", key=" + evt.key() + ']');
            ignite.affinity("ProductsCache").isPrimary(ignite.cluster().localNode(), key);
            return true;
        }
    };