Search code examples
ignite

Ignite DataStreamer not loading data


I'm using ignite version 2.0.0

I am trying to load about 40M data from db to my ignite instances(2 in this case). I read about the Ignite DataStreamers from https://apacheignite.readme.io/v1.2/docs/data-streamers

Because of the very low data loading speed of loadCache() i changed loadCache() with IgniteDataStreamer.addData().

Upon execution, i notice in the web console that the metrics for on-heap entries gets incrementing(i.e., it shows that data is being loaded). But when i query the ignite cache, i get the result as empty.

Also i notice that the server logs are showcasing this exception:

[01:46:19,541][ERROR][flusher-0-#66%RemoteIgniteCluster%][GridCacheWriteBehindStore] Unable to update underlying store: CacheJdbcPojoStore []
 javax.cache.CacheException: Failed to read property value from non binary object [class=class java.lang.Integer, property=class_no]
    at org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStore.extractBinaryParameter(CacheJdbcPojoStore.java:122)
    at org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStore.extractParameter(CacheJdbcPojoStore.java:69)
    at org.apache.ignite.cache.store.jdbc.CacheAbstractJdbcStore.fillValueParameters(CacheAbstractJdbcStore.java:1414)
    at org.apache.ignite.cache.store.jdbc.CacheAbstractJdbcStore.writeAll(CacheAbstractJdbcStore.java:1081)
    at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.updateStore(GridCacheWriteBehindStore.java:804)
    at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.applyBatch(GridCacheWriteBehindStore.java:720)
    at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.access$2400(GridCacheWriteBehindStore.java:75)
    at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$Flusher.flushCacheCoalescing(GridCacheWriteBehindStore.java:1108)
    at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$Flusher.body(GridCacheWriteBehindStore.java:1006)
    at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:110)
    at java.lang.Thread.run(Thread.java:745)
[01:50:00,945][ERROR][flusher-0-#66%RemoteIgniteCluster%][GridCacheWriteBehindStore] Unable to update underlying store: CacheJdbcPojoStore []
javax.cache.integration.CacheWriterException: Failed to write entries in database
    at org.apache.ignite.cache.store.jdbc.CacheAbstractJdbcStore.writeAll(CacheAbstractJdbcStore.java:1151)
    at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.updateStore(GridCacheWriteBehindStore.java:804)
    at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.applyBatch(GridCacheWriteBehindStore.java:720)
    at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.access$2400(GridCacheWriteBehindStore.java:75)
    at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$Flusher.flushCacheCoalescing(GridCacheWriteBehindStore.java:1108)
    at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$Flusher.body(GridCacheWriteBehindStore.java:1006)
    at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:110)
    at java.lang.Thread.run(Thread.java:745)

My LoadCache.java code is:

package load;

import javax.naming.event.ObjectChangeListener;

import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;


public class LoadCaches {

public static void main(String[] args) throws Exception {
    try (Ignite ignite = Ignition.start("RemoteIgniteCluster-client.xml");IgniteDataStreamer<Integer, Object> stmr = ignite.dataStreamer("PersonsCache")) {

        System.out.println(">>> Loading caches...");

             stmr.allowOverwrite(true);
             stmr.autoFlushFrequency(1000);

            System.out.println(">>> Loading cache: PersonsCache");

            for (int i = 0; i < 5000000; i++)
                stmr.addData(i, new Integer(i));


             /*ignite.cache("PersonsCache").loadCache(null);*/ 

            System.out.println(">>> All caches loaded!");
    }
}
}

I tried changing

IgniteDataStreamer<Integer, Object> stmr = ignite.dataStreamer("PersonsCache"))   

and

stmr.addData(i, new Integer(i));

with

IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer("PersonsCache"))

and

stmr.addData(i, Integer.toString(i));

and yet i get the same exception in the server logs, and when i try to query the cache i get empty results.

My spring xmls look like this:

<?xml version="1.0" encoding="UTF-8"?>
 <beans xmlns="http://www.springframework.org/schema/beans"
   xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
   xmlns:util="http://www.springframework.org/schema/util"
   xsi:schemaLocation="http://www.springframework.org/schema/beans
                       http://www.springframework.org/schema/beans/spring-beans.xsd
                       http://www.springframework.org/schema/util
                       http://www.springframework.org/schema/util/spring-util.xsd">
<!-- Load external properties file. -->
<bean id="placeholderConfig" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
    <property name="location" value="classpath:secret.properties"/>
</bean>

<!-- Data source beans will be initialized from external properties file. -->
<bean id="dsMySQL_DB" class="com.mysql.jdbc.jdbc2.optional.MysqlDataSource">
    <property name="URL" value="${dsMySQL_DB.jdbc.url}"/>
    <property name="user" value="${dsMySQL_DB.jdbc.username}"/>
    <property name="password" value="${dsMySQL_DB.jdbc.password}"/>
</bean>

<bean class="org.apache.ignite.configuration.IgniteConfiguration">
    <property name="igniteInstanceName" value="testcluster"/>

    <property name="peerClassLoadingEnabled" value="false" />

    <property name="includeEventTypes">
        <list>
            <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED" />
            <util:constant
                static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED" />
            <util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED" />
        </list>
    </property>
    <!-- Configure internal thread pool. -->
    <property name="publicThreadPoolSize" value="64" />
    <!-- Configure system thread pool. -->
    <property name="systemThreadPoolSize" value="32" />

    <property name="discoverySpi">
        <bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
            <property name="ipFinder">
                 <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder"> 
                <!-- <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> -->
                    <property name="addresses">
                        <list>
                            <value>127.0.0.1:47500..47510</value>
                        </list>
                    </property>
                </bean>
            </property>
        </bean>
    </property>

    <property name="atomicConfiguration">
        <bean class="org.apache.ignite.configuration.AtomicConfiguration">
            <property name="backups" value="0"/>
        </bean>
    </property>

    <property name="cacheKeyConfiguration">
        <list>
            <bean class="org.apache.ignite.cache.CacheKeyConfiguration">
                <constructor-arg value="com.gmail.testcluster.model.Persons"/>
                <constructor-arg value="age"/>
            </bean>
        </list>
    </property>

    <property name="cacheConfiguration">
        <list>
            <bean class="org.apache.ignite.configuration.CacheConfiguration">

                <!-- Set rebalance batch size to 1 MB. -->
                <property name="rebalanceBatchSize" value="#{1024 * 1024}" />
                <!-- Explicitly disable rebalance throttling. -->
                <property name="rebalanceThrottle" value="0" />
                <!-- Set 4 threads for rebalancing. -->
                <property name="rebalanceThreadPoolSize" value="4" />

                <property name="name" value="PersonsCache"/>
                <property name="cacheMode" value="PARTITIONED"/>
                <property name="atomicityMode" value="ATOMIC"/>
                <property name="backups" value="0"/>

                <property name="affinity">
                    <bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction">
                        <property name="partitions" value="2048"/>
                    </bean>
                </property>

                <!-- <property name="queryDetailMetricsSize" value="50"/> -->

                <property name="cacheStoreFactory">
                    <bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory">
                        <property name="dataSourceBean" value="dsMySQL_DB"/>
                        <property name="dialect">
                            <bean class="org.apache.ignite.cache.store.jdbc.dialect.MySQLDialect">
                            </bean>
                        </property>

                        <property name="types">
                            <list>
                                <bean class="org.apache.ignite.cache.store.jdbc.JdbcType">
                                    <property name="cacheName" value="PersonsCache"/>
                                    <property name="keyType" value="java.lang.Integer"/>
                                    <property name="valueType" value="com.gmail.testcluster.model.Persons"/>
                                    <property name="databaseSchema" value="MY_DB"/>
                                    <property name="databaseTable" value="PERSONS"/>

                                    <property name="keyFields">
                                        <list>
                                            <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
                                                <constructor-arg>
                                                    <util:constant static-field="java.sql.Types.INTEGER"/>
                                                </constructor-arg>
                                                <constructor-arg value="ID"/>
                                                <constructor-arg value="int"/>
                                                <constructor-arg value="id"/>
                                            </bean>
                                        </list>
                                    </property>

                                    <property name="valueFields">
                                        <list>
                                            <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
                                                <constructor-arg>
                                                    <util:constant static-field="java.sql.Types.INTEGER"/>
                                                </constructor-arg>
                                                <constructor-arg value="CLASS_NO"/>
                                                <constructor-arg value="java.lang.Integer"/>
                                                <constructor-arg value="class_no"/>
                                            </bean>

                                            <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
                                                <constructor-arg>
                                                    <util:constant static-field="java.sql.Types.VARCHAR"/>
                                                </constructor-arg>
                                                <constructor-arg value="NAME"/>
                                                <constructor-arg value="java.lang.String"/>
                                                <constructor-arg value="name"/>
                                            </bean>

                                            <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
                                                <constructor-arg>
                                                    <util:constant static-field="java.sql.Types.INTEGER"/>
                                                </constructor-arg>
                                                <constructor-arg value="AGE"/>
                                                <constructor-arg value="java.lang.Integer"/>
                                                <constructor-arg value="age"/>
                                            </bean>

                                            <bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
                                                <constructor-arg>
                                                    <util:constant static-field="java.sql.Types.DOUBLE"/>
                                                </constructor-arg>
                                                <constructor-arg value="R_AMT"/>
                                                <constructor-arg value="java.lang.Double"/>
                                                <constructor-arg value="rAmt"/>
                                            </bean>
                                        </list>
                                    </property>
                                </bean>
                            </list>
                        </property>
                    </bean>
                </property>

                <property name="readThrough" value="true"/>
                <property name="writeThrough" value="true"/>
                <property name="writeBehindEnabled" value="true"/>
                <property name="writeBehindBatchSize" value="500"/>
                <property name="writeBehindFlushSize" value="1000"/>
                <property name="writeBehindFlushFrequency" value="60000"/>

                <property name="queryEntities">
                    <list>
                        <bean class="org.apache.ignite.cache.QueryEntity">
                            <property name="keyType" value="java.lang.Integer"/>
                            <property name="valueType" value="com.gmail.testcluster.model.Persons"/>

                            <property name="fields">
                                <map>
                                    <entry key="class_no" value="java.lang.Integer"/>
                                    <entry key="name" value="java.lang.String"/>
                                    <entry key="age" value="java.lang.Integer"/>
                                    <entry key="rAmt" value="java.lang.Double"/>
                                </map>
                            </property>

                            <property name="aliases">
                                <map>
                                    <entry key="rAmt" value="R_AMT"/>
                                </map>
                            </property>

                            <!-- <property name="indexes">
                                <list>
                                    <bean class="org.apache.ignite.cache.QueryIndex">
                                        <property name="name" value="queryindexing"/>
                                        <property name="indexType" value="SORTED"/>

                                        <property name="fields">
                                            <map>
                                                <entry key="class_no" value="true"/>
                                            </map>
                                        </property>
                                    </bean>
                                </list>
                            </property> -->
                        </bean>
                    </list>
                </property>
            </bean>
        </list>
    </property>
</bean>

I am not sure where the problem is, can someone take a look? Thanks!!!


Solution

  • When DataStreamer's property allowOverwrite=false (default) it causes Persistent Store to be skipped.

    Cache store is invoked only when AllowOverwrite is true. Link to doc

    if you want to use readThrough, then use writeThrough. You can read about these modes in doc