I am using Ignite 1.7.0 and was testing the write behind feature of Apache Ignite. The motive of asking this question is to get a better understanding of what is happening behind the scenes when write behind feature is enabled in Apache Ignite.
I have a Ignite Client Program which will insert 20 entries into a test cache ( call it "test_cache" ).
The Ignite Server is running on the same machine, but on a different JVM.
The Ignite Cache has the following configurations setup :
All the other properties are set to default.
In addition, to this there is a cache store configured for the cache and the code is as follows :
package com.ignite.genericpoc;
import java.util.Collection;
import java.util.Map;
import javax.cache.Cache.Entry;
import javax.cache.integration.CacheLoaderException;
import javax.cache.integration.CacheWriterException;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.cache.store.CacheStore;
import org.apache.ignite.lang.IgniteBiInClosure;
import org.apache.ignite.resources.CacheNameResource;
import org.apache.ignite.resources.IgniteInstanceResource;
public class IgniteStoreTest implements CacheStore<String, String> {
@IgniteInstanceResource
Ignite gridReference;
@CacheNameResource
String cacheName;
@Override
public String load(String key) throws CacheLoaderException {
System.out.println("load method called for the key [ " + key + " ] and cache [ " + cacheName + " ] ");
return null;
}
@Override
public Map<String, String> loadAll(Iterable<? extends String> keys) throws CacheLoaderException {
IgniteCache<String, String> ic = gridReference.cache(cacheName);
int currentKeyNo = 0;
for (String key : keys) {
ic.put(key, "Value:" + currentKeyNo);
currentKeyNo++;
}
System.out.println("Got " + currentKeyNo + " entries");
return null;
}
@Override
public void write(Entry<? extends String, ? extends String> entry) throws CacheWriterException {
System.out.println("Write method called");
}
@Override
public void writeAll(Collection<Entry<? extends String, ? extends String>> entries) throws CacheWriterException {
System.out.println("Write all method called for [ " + entries.size() + " ] entries in the thread "
+ Thread.currentThread().getName());
System.out.println("Entries recieved by " + Thread.currentThread().getName() + " : " + entries.toString());
try {
Thread.sleep(60000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void delete(Object key) throws CacheWriterException {
System.out.println("Delete method called");
}
@Override
public void deleteAll(Collection<?> keys) throws CacheWriterException {
System.out.println("Delete All method called");
}
@Override
public void loadCache(IgniteBiInClosure<String, String> clo, Object... args) throws CacheLoaderException {
System.out.println("Load cache method called with " + args[0].toString());
}
@Override
public void sessionEnd(boolean commit) throws CacheWriterException {
System.out.println("Session End called");
}
}
I have purposefully called Thread.sleep() method in writeAll() method in order, to simulate a slow database writes.
The code for the Ignite Client which is loading the data into the cache is as follows :
package com.ignite.genericpoc;
import java.util.ArrayList;
import java.util.List;
import javax.cache.configuration.FactoryBuilder;
import org.apache.ignite.Ignite;
import org.apache.ignite.IgniteCache;
import org.apache.ignite.Ignition;
import org.apache.ignite.configuration.CacheConfiguration;
public class IgnitePersistentStoreClientTest {
public static void main(String[] args) throws InterruptedException {
List<String> addressess = new ArrayList<>();
addressess.add("*.*.*.*:47500"); // Hiding the IP
Ignition.setClientMode(true);
Ignite i = IgniteConfigurationUtil.startIgniteServer(
IgniteConfigurationUtil.getIgniteConfiguration(false, IgniteTestConstants.GRID_NAME, addressess));
System.out.println("Client Started");
CacheConfiguration<String, String> ccfg = new CacheConfiguration<>();
ccfg.setName("Persistent_Store_Test_Cache");
ccfg.setCacheStoreFactory(FactoryBuilder.factoryOf(IgniteStoreTest.class));
ccfg.setReadThrough(true);
ccfg.setWriteThrough(true);
ccfg.setWriteBehindEnabled(true);
ccfg.setWriteBehindFlushSize(13);
ccfg.setWriteBehindFlushThreadCount(1);
System.out.println(ccfg.getWriteBehindBatchSize());
IgniteCache<String, String> ic = i.getOrCreateCache(ccfg);
System.out.println("Cache Created");
for (int t = 1; t <= 20; t++) {
System.out.println("Loading key "+t);
ic.put("Key:" + t,"Value: "+t);
System.out.println("Key "+ t + " loaded ");
}
System.out.println("Cache Loaded");
i.close();
}
}
The execution happens as follows :
The Ignite Server is started first.
The Ignite Client which loads the data is started after the server.
Since, there is a sleep of 60 seconds defined on the writeAll() method, the Ignite Client gets stuck while writing the 20th entry.
Also, I can see in the server logs that the writeAll() method is called for two threads, and among them the Flush thread has recieved 15 entries to write to the store and a System thread has recieved 1 entry to write to the store. The Ignite Server logs are as follows :
Write all method called for [ 15 ] entries in the thread flusher-0-#66%test_grid%
Write all method called for [ 1 ] entries in the thread sys-#22%test_grid%
I can understand that the Ignite Client put is stuck at writing the 20 the entry because the Write Behind cache is full and all Flush threads are also busy writing data.
The following are the points on which I need to have a clear understanding :
Why is the client being blocked at inserting the 20th entry, it should have been blocked while inserting the 14th entry ( based on the max cache size of 13 entries )
Why was the Flush thread called only with 15 entries and not all the 19 entries, as I have not set the batch size and it defaults to 512.
Is the System thread that was called with writeAll() method the same thread which was handling the request from Ignite Client to put the 20th entry.
Considering my Cache has write behind enabled and Write Order Mode is PRIMARY_SYNC ( default ) and there are no backups in the cache, any put call to the cache should be blocked until the primary node is able to commit the write. Does this also mean able to put the entry in the Write Behind cache.
In case of storing an entry in the server, does Ignite Server makes two copies of the entry one for storage and one for the write behind cache. Or is the same entry's reference used.
Thank you for your patience in reading the question. I apologize if the question was too lengthy, but the content was essential in elaborating the situation to the concerned audience.
The write-behind store has backpressure control under the hood. It means that async operations can be converted into the sync on the fly if the system cannot handle all of them.
In case of the size of underlying write-behind cache exceeds the critical size (flushSize * 1.5), thread, which is executing write operation, will be used instead of flusherThread.
That is the reason why you see those threads in your log:
Considering my Cache has write behind enabled and Write Order Mode is PRIMARY_SYNC ( default ) and there are no backups in the cache, any put call to the cache should be blocked until the primary node is able to commit the write. Does this also mean able to put the entry in the Write Behind cache.
Yes, it does.
In case of storing an entry in the server, does Ignite Server makes two copies of the entry one for storage and one for the write behind cache. Or is the same entry's reference used.
The same entry's reference should be used.
Let's consider this scenario step by step:
client thread has uploaded 14 entries. GridCacheWriteBehindStore
detects that a number of entries into the underlying cache exceeds the flush size and sends a signal in order to wake up flusher thread.
please see GridCacheWriteBehindStore#updateCache()
flusher thread wakes up and tries to acquire the data from write-behind cache (which is ConcurrentLinkedHashMap
) via write-behind-cache.entrySet().iterator()
.
This iterator provides weakly consistent traversal, i.e. it is not guaranteed that it reflects any modifications subsequent to construction.
The important thing is that client thread puts new entries in parallel.
client thread puts the last value [key=Key:20, val=Value: 20]
. At the same time, the flusher thread is blocked by Thread.sleep()
in writeAll()
method.
GridCacheWriteBehindStore
detects that current size of write-behind cache exceeds critical size (flush size * 1.5) and so, the back-pressure mechanism should be used.
GridCacheWriteBehindStore
calls flushSingleValue()
method in order to flush the oldest value from write-behind cache (of course, this value should not be acquired by flusher thread before).
flushSingleValue()
method is called in the context of the client thread.
After, flusher thread wakes up and processes remaining entries.
I hope it is helpful to understand write-behind store implementation.
Thanks!