Search code examples
javaeventsconfigurationlistenerignite

apache ignite local cache events not sent to java listener class


I have a 2 node apache ignite (v2.9.0) cluster with a single distributed cache called customer_cache. I am trying to create a local event listener that receives EventType.EVT_CACHE_OBJECT_PUT events. I am following the ignite documentation (https://ignite.apache.org/docs/2.9.0/events/listening-to-events) but the local listener never "fires" when I add an entry to the cache. The listener class is running on each of the 2 ignite nodes.

Here is my ignite configuration XML

<bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">

    <property name="peerClassLoadingEnabled" value="true"/>
    <property name="rebalanceThreadPoolSize" value="2"/>
    <property name="metricsLogFrequency" value="#{60 * 10 * 1000}"/>

    <property name="cacheConfiguration">
        <list>
            <bean class="org.apache.ignite.configuration.CacheConfiguration">
                <property name="name" value="customer_cache"/>
            </bean>
        </list>
    </property>

    <property name="dataStorageConfiguration">
        <bean class="org.apache.ignite.configuration.DataStorageConfiguration">
            <property name="defaultDataRegionConfiguration">
                <bean class="org.apache.ignite.configuration.DataRegionConfiguration">
                    <property name="name" value="Default_Region" />
                    <property name="initialSize" value="#{1L * 1024 * 1024 * 1024}" />
                    <property name="maxSize" value="#{6L * 1024 * 1024 * 1024}" />
                    <property name="lazyMemoryAllocation" value="false" />
                    <property name="persistenceEnabled" value="false" />
                </bean>
            </property>
        </bean>
    </property>

    <!-- cache events -->
    <property name="includeEventTypes">
        <list>
            <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
        </list>
    </property>

    <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
    <property name="discoverySpi">
        <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
            <property name="ipFinder">
                <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
                    <property name="addresses">
                        <list>
                            <value>99.18.35.13:47500..47509</value>
                            <value>99.18.35.37:47500..47509</value>
                        </list>
                    </property>
                </bean>
            </property>
        </bean>
    </property>
</bean>

Here is my java listener class:

package com.glib.mystuff.ignite;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteEvents;
import org.apache.ignite.events.CacheEvent;
import org.apache.ignite.events.EventType;
import org.apache.ignite.lang.IgnitePredicate;

public class LocalCachePutListener extends AbstractIgniteTask {
    
    public static void main(String[] args) {
        new LocalCachePutListener().go();
    }

    private void go() {

        // Start an ignite client using parent class helper method
        Ignite ignite = getIgniteClient();
        
        IgniteEvents events = ignite.events(ignite.cluster().forCacheNodes("customer_cache"));

        // Local listener that listens to local events.
        IgnitePredicate<CacheEvent> localListener = evt -> {
            System.out.println("-----------> Received event name=" + 
                               evt.name() + 
                               ", key=" + 
                               evt.key() + 
                               ", oldVal=" + 
                               evt.oldValue() + 
                               ", newVal=" + 
                               evt.newValue());

             // Continue listening.
             return true; 
         };

         // (for some reason this is needed although it is in the ignite config)
         events.enableLocal(EventType.EVT_CACHE_OBJECT_PUT);
         
         // Subscribe to the cache events that are triggered on the local node.
         events.localListen(localListener, EventType.EVT_CACHE_OBJECT_PUT);
    }
}

I then have a simple java class that adds an entry to the customer_cache. I know the entry is added by using the ignitevisor.sh utility to see extended details of my cache via cache -a. As far as I can tell from the ignite documentation I have all the required components & configuration for listening to local EventType.EVT_CACHE_OBJECT_PUT events. What am I doing wrong???

Many thanks for any help provided. If I had any hair, I would have pulled it all out by now.


Solution

  • Local listener for EVT_CACHE_OBJECT_PUT event will only fire on server nodes where the key is stored. However, you're trying to catch it on client:

    Ignite ignite = getIgniteClient();
    

    Maybe you can use a continuous query instead?