Search code examples
javaignite

Apache Ignite Availability Issue w/Custom CacheStoreAdapter


I'm doing a PoC using apache ignite. Here is the scenario I'm testing:

  1. Start a cluster of 3 nodes and a client.
  2. Call get key. I log on node that caches this key.
  3. Call get key. I verify it gets stored value.
  4. Do a loadCache(). All nodes report successfully Loading cache.
  5. Kill node that originally loaded key
  6. Restart node that I just killed.
  7. Query for key again.

Steps 6 and 7 have some trouble. If I wait Long enough between the two everything works as it should. However if try to do 6 and 7 too close together then I get this error on the client and this error on the node.

I see the error IgniteClientDisconnectedException: Failed to wait for topology update, client disconnected. However is there a way to avoid this issue? Setting a longer time to wait for a topology update isn't really an option because a client may try to connect at any time. Is it to do with my cluster configuration? I saw this documentation which suggests infinitely trying to connect which seems like it would just keep erring.

Also, we would need to be able to dynamically grow/shrink the cluster. Is this possible? Would having in memory backups fix the functionality?

Note, if I omit step 6 I've not seen it fail.

Cluster Node Config

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xsi:schemaLocation="
       http://www.springframework.org/schema/beans
       http://www.springframework.org/schema/beans/spring-beans.xsd">
    <!--<import resource="./cache.xml"/>-->
    <bean id="grid.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <property name="peerClassLoadingEnabled" value="true"/>

        <property name="cacheConfiguration">
            <bean class="org.apache.ignite.configuration.CacheConfiguration">
                <!-- Set a cache name. -->
                <property name="name" value="recordData"/>
                <!--<property name="rebalanceMode" value="SYNC"/>-->
                <!-- Set cache mode. -->
                <property name="cacheMode" value="PARTITIONED"/>

                <property name="cacheStoreFactory">
                    <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
                        <constructor-arg value="Application.RecordDataStore"/>
                    </bean>
                </property>
                <property name="readThrough" value="true"/>
                <property name="writeThrough"  value="true"/>

            </bean>
        </property>

        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <!-- Override local port. -->
                <property name="localPort" value="8000"/>
            </bean>
        </property>

        <property name="communicationSpi">
            <bean class="org.apache.ignite.spi.communication.tcp.TcpCommunicationSpi">
                <!-- Override local port. -->
                <property name="localPort" value="8100"/>
            </bean>
        </property>
    </bean>
</beans>

Client Config

<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:util="http://www.springframework.org/schema/util"
       xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/util
        http://www.springframework.org/schema/util/spring-util.xsd">
    <bean abstract="true" id="ignite.cfg" class="org.apache.ignite.configuration.IgniteConfiguration">
        <!-- Set to true to enable distributed class loading for examples, default is false. -->
        <property name="peerClassLoadingEnabled" value="true"/>
        <property name="clientMode" value="true"/>

        <property name="cacheConfiguration">
            <bean class="org.apache.ignite.configuration.CacheConfiguration">
                <!-- Set a cache name. -->
                <property name="name" value="recordData"/>
                <!--<property name="rebalanceMode" value="SYNC"/>-->

                <!-- Set cache mode. -->
                <property name="cacheMode" value="PARTITIONED"/>

                <property name="cacheStoreFactory">
                    <bean class="javax.cache.configuration.FactoryBuilder" factory-method="factoryOf">
                        <constructor-arg value="com.digitaslbi.idiom.util.RecordDataStore"/>
                    </bean>
                </property>
                <property name="readThrough" value="true"/>
                <property name="writeThrough"  value="true"/>

            </bean>
        </property>

        <!-- Enable task execution events for examples. -->
        <property name="includeEventTypes">
            <list>
                <!--Task execution events-->
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_TIMEDOUT"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_SESSION_ATTR_SET"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_REDUCED"/>
                <!--Cache events-->
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_PUT"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_READ"/>
                <util:constant static-field="org.apache.ignite.events.EventType.EVT_CACHE_OBJECT_REMOVED"/>
            </list>
        </property>

        <!-- Explicitly configure TCP discovery SPI to provide list of initial nodes. -->
        <property name="discoverySpi">
            <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
                <property name="ipFinder">
                    <!--
                        Ignite provides several options for automatic discovery that can be used
                        instead os static IP based discovery. For information on all options refer
                        to our documentation: http://apacheignite.readme.io/docs/cluster-config
                    -->
                    <!-- Uncomment static IP finder to enable static-based discovery of initial nodes. -->
                    <!--<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">-->
                    <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder">
                        <property name="addresses">
                            <list>
                                <!-- In distributed environment, replace with actual host IP address. -->
                                <value>localhost:8000..8099</value>
                                <!--<value>127.0.0.1:47500..47509</value>-->
                            </list>
                        </property>
                    </bean>
                </property>
            </bean>
        </property>
    </bean>
</beans>

Implemented methods of CacheStoreAdaptor

public class RecordDataStore extends CacheStoreAdapter<Long, List<Record>> {

  // This method is called whenever "get(...)" methods are called on IgniteCache.
    @Override public List<Record> load(Long key) {
        System.out.println("Load data for pel: " + key);
        try {
            CouchDbConnector db = RecordDataStore.getDb();
            ViewQuery viewQuery = new ViewQuery().designDocId("_design/docs").viewName("all");
            List<Record> list = db.queryView(viewQuery,Record.class);
            HashMultimap<Long,Record> multimap = HashMultimap.create();

            list.forEach(r -> {
                multimap.put(r.getId(),r);
            });
            return new LinkedList<>(multimap.get(key));
        } catch (MalformedURLException e) {
            throw new CacheLoaderException("Failed to load values from cache store.", e);
        }
    }
    ....
    @Override public void loadCache(IgniteBiInClosure<Long, List<Record>> clo, Object... args) {
        if (args == null || args.length == 0 || args[0] == null) {
            throw new CacheLoaderException("Expected entry count parameter is not provided.");
        }

        System.out.println("Loading Cache...");
        final long entryCnt = (Long)args[0];

        try{
            CouchDbConnector db = RecordDataStore.getDb();
            ViewQuery viewQuery = new ViewQuery().designDocId("_design/docs").viewName("all");
            List<Record> list = db.queryView(viewQuery,Record.class);
            HashMultimap<Long,Record> multimap = HashMultimap.create();

            long count = 0;
            for(Record r : list) {
                multimap.put(r.getPel(),r);
                count++;
                if(count == entryCnt)
                    break;
            }

            multimap.keySet().forEach(key -> {
                clo.apply(key,new LinkedList<>(multimap.get(key)));
            });
        }
        catch (MalformedURLException e) {
            throw new CacheLoaderException("Failed to load values from cache store.", e);
        }

        System.out.println("Loaded Cache");
    }

    public static CouchDbConnector getDb() throws MalformedURLException {
        HttpClient httpClient = new StdHttpClient.Builder()
            .url("server:1111/")
            .build();

        CouchDbInstance dbInstance = new StdCouchDbInstance(httpClient);
        CouchDbConnector db = new StdCouchDbConnector("ignite", dbInstance);

        return db;
    }
}

Solution

  • http://apache-ignite-users.70518.x6.nabble.com/Ignite-cluster-recovery-after-network-partition-td2775.html stresses that the IgniteClientDisconnectedException provides a IgniteFuture that can be accessed by calling

    IgniteFuture f = myException.reconnectFuture();
    

    That future has a get()-method, which waits for the node to reconnect:

    Synchronously waits for completion of the computation and returns computation result.

    Thus, the following should finish when the client has reconnected:

    f.get();