I'm using ignite version 2.0.0
I am trying to load about 40M data from db to my ignite instances(2 in this case). I read about the Ignite DataStreamers from https://apacheignite.readme.io/v1.2/docs/data-streamers
Because of the very low data loading speed of loadCache() i changed loadCache() with IgniteDataStreamer.addData().
Upon execution, i notice in the web console that the metrics for on-heap entries gets incrementing(i.e., it shows that data is being loaded). But when i query the ignite cache, i get the result as empty.
Also i notice that the server logs are showcasing this exception:
[01:46:19,541][ERROR][flusher-0-#66%RemoteIgniteCluster%][GridCacheWriteBehindStore] Unable to update underlying store: CacheJdbcPojoStore []
javax.cache.CacheException: Failed to read property value from non binary object [class=class java.lang.Integer, property=class_no]
at org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStore.extractBinaryParameter(CacheJdbcPojoStore.java:122)
at org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStore.extractParameter(CacheJdbcPojoStore.java:69)
at org.apache.ignite.cache.store.jdbc.CacheAbstractJdbcStore.fillValueParameters(CacheAbstractJdbcStore.java:1414)
at org.apache.ignite.cache.store.jdbc.CacheAbstractJdbcStore.writeAll(CacheAbstractJdbcStore.java:1081)
at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.updateStore(GridCacheWriteBehindStore.java:804)
at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.applyBatch(GridCacheWriteBehindStore.java:720)
at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.access$2400(GridCacheWriteBehindStore.java:75)
at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$Flusher.flushCacheCoalescing(GridCacheWriteBehindStore.java:1108)
at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$Flusher.body(GridCacheWriteBehindStore.java:1006)
at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:110)
at java.lang.Thread.run(Thread.java:745)
[01:50:00,945][ERROR][flusher-0-#66%RemoteIgniteCluster%][GridCacheWriteBehindStore] Unable to update underlying store: CacheJdbcPojoStore []
javax.cache.integration.CacheWriterException: Failed to write entries in database
at org.apache.ignite.cache.store.jdbc.CacheAbstractJdbcStore.writeAll(CacheAbstractJdbcStore.java:1151)
at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.updateStore(GridCacheWriteBehindStore.java:804)
at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.applyBatch(GridCacheWriteBehindStore.java:720)
at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore.access$2400(GridCacheWriteBehindStore.java:75)
at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$Flusher.flushCacheCoalescing(GridCacheWriteBehindStore.java:1108)
at org.apache.ignite.internal.processors.cache.store.GridCacheWriteBehindStore$Flusher.body(GridCacheWriteBehindStore.java:1006)
at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:110)
at java.lang.Thread.run(Thread.java:745)
My LoadCache.java code is:
package load;
import javax.naming.event.ObjectChangeListener;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteDataStreamer;
import org.apache.ignite.Ignition;
public class LoadCaches {
public static void main(String[] args) throws Exception {
try (Ignite ignite = Ignition.start("RemoteIgniteCluster-client.xml");IgniteDataStreamer<Integer, Object> stmr = ignite.dataStreamer("PersonsCache")) {
System.out.println(">>> Loading caches...");
stmr.allowOverwrite(true);
stmr.autoFlushFrequency(1000);
System.out.println(">>> Loading cache: PersonsCache");
for (int i = 0; i < 5000000; i++)
stmr.addData(i, new Integer(i));
/*ignite.cache("PersonsCache").loadCache(null);*/
System.out.println(">>> All caches loaded!");
}
}
}
I tried changing
IgniteDataStreamer<Integer, Object> stmr = ignite.dataStreamer("PersonsCache"))
and
stmr.addData(i, new Integer(i));
with
IgniteDataStreamer<Integer, String> stmr = ignite.dataStreamer("PersonsCache"))
and
stmr.addData(i, Integer.toString(i));
and yet i get the same exception in the server logs, and when i try to query the cache i get empty results.
My spring xmls look like this:
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:util="http://www.springframework.org/schema/util"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/util
http://www.springframework.org/schema/util/spring-util.xsd">
<!-- Load external properties file. -->
<bean id="placeholderConfig" class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer">
<property name="location" value="classpath:secret.properties"/>
</bean>
<!-- Data source beans will be initialized from external properties file. -->
<bean id="dsMySQL_DB" class="com.mysql.jdbc.jdbc2.optional.MysqlDataSource">
<property name="URL" value="${dsMySQL_DB.jdbc.url}"/>
<property name="user" value="${dsMySQL_DB.jdbc.username}"/>
<property name="password" value="${dsMySQL_DB.jdbc.password}"/>
</bean>
<bean class="org.apache.ignite.configuration.IgniteConfiguration">
<property name="igniteInstanceName" value="testcluster"/>
<property name="peerClassLoadingEnabled" value="false" />
<property name="includeEventTypes">
<list>
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_STARTED" />
<util:constant
static-field="org.apache.ignite.events.EventType.EVT_TASK_FINISHED" />
<util:constant static-field="org.apache.ignite.events.EventType.EVT_TASK_FAILED" />
</list>
</property>
<!-- Configure internal thread pool. -->
<property name="publicThreadPoolSize" value="64" />
<!-- Configure system thread pool. -->
<property name="systemThreadPoolSize" value="32" />
<property name="discoverySpi">
<bean class="org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi">
<property name="ipFinder">
<bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.multicast.TcpDiscoveryMulticastIpFinder">
<!-- <bean class="org.apache.ignite.spi.discovery.tcp.ipfinder.vm.TcpDiscoveryVmIpFinder"> -->
<property name="addresses">
<list>
<value>127.0.0.1:47500..47510</value>
</list>
</property>
</bean>
</property>
</bean>
</property>
<property name="atomicConfiguration">
<bean class="org.apache.ignite.configuration.AtomicConfiguration">
<property name="backups" value="0"/>
</bean>
</property>
<property name="cacheKeyConfiguration">
<list>
<bean class="org.apache.ignite.cache.CacheKeyConfiguration">
<constructor-arg value="com.gmail.testcluster.model.Persons"/>
<constructor-arg value="age"/>
</bean>
</list>
</property>
<property name="cacheConfiguration">
<list>
<bean class="org.apache.ignite.configuration.CacheConfiguration">
<!-- Set rebalance batch size to 1 MB. -->
<property name="rebalanceBatchSize" value="#{1024 * 1024}" />
<!-- Explicitly disable rebalance throttling. -->
<property name="rebalanceThrottle" value="0" />
<!-- Set 4 threads for rebalancing. -->
<property name="rebalanceThreadPoolSize" value="4" />
<property name="name" value="PersonsCache"/>
<property name="cacheMode" value="PARTITIONED"/>
<property name="atomicityMode" value="ATOMIC"/>
<property name="backups" value="0"/>
<property name="affinity">
<bean class="org.apache.ignite.cache.affinity.rendezvous.RendezvousAffinityFunction">
<property name="partitions" value="2048"/>
</bean>
</property>
<!-- <property name="queryDetailMetricsSize" value="50"/> -->
<property name="cacheStoreFactory">
<bean class="org.apache.ignite.cache.store.jdbc.CacheJdbcPojoStoreFactory">
<property name="dataSourceBean" value="dsMySQL_DB"/>
<property name="dialect">
<bean class="org.apache.ignite.cache.store.jdbc.dialect.MySQLDialect">
</bean>
</property>
<property name="types">
<list>
<bean class="org.apache.ignite.cache.store.jdbc.JdbcType">
<property name="cacheName" value="PersonsCache"/>
<property name="keyType" value="java.lang.Integer"/>
<property name="valueType" value="com.gmail.testcluster.model.Persons"/>
<property name="databaseSchema" value="MY_DB"/>
<property name="databaseTable" value="PERSONS"/>
<property name="keyFields">
<list>
<bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
<constructor-arg>
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
<constructor-arg value="ID"/>
<constructor-arg value="int"/>
<constructor-arg value="id"/>
</bean>
</list>
</property>
<property name="valueFields">
<list>
<bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
<constructor-arg>
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
<constructor-arg value="CLASS_NO"/>
<constructor-arg value="java.lang.Integer"/>
<constructor-arg value="class_no"/>
</bean>
<bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
<constructor-arg>
<util:constant static-field="java.sql.Types.VARCHAR"/>
</constructor-arg>
<constructor-arg value="NAME"/>
<constructor-arg value="java.lang.String"/>
<constructor-arg value="name"/>
</bean>
<bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
<constructor-arg>
<util:constant static-field="java.sql.Types.INTEGER"/>
</constructor-arg>
<constructor-arg value="AGE"/>
<constructor-arg value="java.lang.Integer"/>
<constructor-arg value="age"/>
</bean>
<bean class="org.apache.ignite.cache.store.jdbc.JdbcTypeField">
<constructor-arg>
<util:constant static-field="java.sql.Types.DOUBLE"/>
</constructor-arg>
<constructor-arg value="R_AMT"/>
<constructor-arg value="java.lang.Double"/>
<constructor-arg value="rAmt"/>
</bean>
</list>
</property>
</bean>
</list>
</property>
</bean>
</property>
<property name="readThrough" value="true"/>
<property name="writeThrough" value="true"/>
<property name="writeBehindEnabled" value="true"/>
<property name="writeBehindBatchSize" value="500"/>
<property name="writeBehindFlushSize" value="1000"/>
<property name="writeBehindFlushFrequency" value="60000"/>
<property name="queryEntities">
<list>
<bean class="org.apache.ignite.cache.QueryEntity">
<property name="keyType" value="java.lang.Integer"/>
<property name="valueType" value="com.gmail.testcluster.model.Persons"/>
<property name="fields">
<map>
<entry key="class_no" value="java.lang.Integer"/>
<entry key="name" value="java.lang.String"/>
<entry key="age" value="java.lang.Integer"/>
<entry key="rAmt" value="java.lang.Double"/>
</map>
</property>
<property name="aliases">
<map>
<entry key="rAmt" value="R_AMT"/>
</map>
</property>
<!-- <property name="indexes">
<list>
<bean class="org.apache.ignite.cache.QueryIndex">
<property name="name" value="queryindexing"/>
<property name="indexType" value="SORTED"/>
<property name="fields">
<map>
<entry key="class_no" value="true"/>
</map>
</property>
</bean>
</list>
</property> -->
</bean>
</list>
</property>
</bean>
</list>
</property>
</bean>
I am not sure where the problem is, can someone take a look? Thanks!!!
When DataStreamer's property allowOverwrite=false (default) it causes Persistent Store to be skipped.
Cache store is invoked only when AllowOverwrite is true. Link to doc
if you want to use readThrough, then use writeThrough. You can read about these modes in doc