Search code examples
javaclassloadergridgainignite

Apache Ignite: Serialization error related to Data Streaming


I am trying to investigate how Apache Ignite streaming works. I have 2 nodes cluster setup (both on localhost), and I start a client node which runs streaming code with StreamTransformer and EntryProcessor. As a result in one of my nodes I get cannot deserialize exception. My code is simplified WordCount example from Ignite documentation:

public class StreamingExample {`
public static class StreamingExampleCacheEntryProcessor implements CacheEntryProcessor<String, Long, Object> {
    @Override
    public Object process(MutableEntry<String, Long> e, Object... arg) throws EntryProcessorException {
        Long val = e.getValue();
        e.setValue(val == null ? 1L : val + 1);
        return null;
    }
}

public static void main(String[] args) throws IgniteException, IOException {
    Ignition.setClientMode(true);
    try (Ignite ignite = Ignition.start("examples/config/example-ignite.xml")) {
        IgniteCache<String, Long> stmCache = ignite.getOrCreateCache("mycache");
        try (IgniteDataStreamer<String, Long> stmr = ignite.dataStreamer(stmCache.getName())) {
            stmr.allowOverwrite(true);
            stmr.receiver(StreamTransformer.from(new StreamingExampleCacheEntryProcessor()));
            stmr.addData("word", 1L);
            System.out.println("Finished");
        }
    }
}

}

Exception I get one ONE of two nodes is

[23:38:23] Topology snapshot [ver=5, servers=2, clients=1, CPUs=4, heap=3.3GB] Exception in thread "pub-#9%null%" class org.apache.ignite.binary.BinaryObjectException: Failed to unmarshal object with optimized marshaller at org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1595) at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(BinaryReaderExImpl.java:1663) at org.apache.ignite.internal.binary.GridBinaryMarshaller.deserialize(GridBinaryMarshaller.java:298) at org.apache.ignite.internal.binary.BinaryMarshaller.unmarshal(BinaryMarshaller.java:109) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.processRequest(DataStreamProcessor.java:278) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor.access$000(DataStreamProcessor.java:50) at org.apache.ignite.internal.processors.datastreamer.DataStreamProcessor$1.onMessage(DataStreamProcessor.java:80) at org.apache.ignite.internal.managers.communication.GridIoManager.invokeListener(GridIoManager.java:1238) at org.apache.ignite.internal.managers.communication.GridIoManager.processRegularMessage0(GridIoManager.java:866) at org.apache.ignite.internal.managers.communication.GridIoManager.access$1700(GridIoManager.java:106) at org.apache.ignite.internal.managers.communication.GridIoManager$5.run(GridIoManager.java:829) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) at java.lang.Thread.run(Thread.java:745) Caused by: class org.apache.ignite.IgniteCheckedException: Failed to find class with given class loader for unmarshalling (make sure same versions of all classes are avai lable on all nodes or enable peer-class-loading): sun.misc.Launcher$AppClassLoader@4e857327 at org.apache.ignite.marshaller.optimized.OptimizedMarshaller.unmarshal(OptimizedMarshaller.java:224) at org.apache.ignite.internal.binary.BinaryUtils.doReadOptimized(BinaryUtils.java:1592) ... 13 more Caused by: java.lang.ClassNotFoundException: gridgaingames.StreamingExample$StreamingExampleCacheEntryProcessor at java.net.URLClassLoader$1.run(URLClassLoader.java:366) at java.net.URLClassLoader$1.run(URLClassLoader.java:355) at java.security.AccessController.doPrivileged(Native Method) at java.net.URLClassLoader.findClass(URLClassLoader.java:354) at java.lang.ClassLoader.loadClass(ClassLoader.java:425) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:308) at java.lang.ClassLoader.loadClass(ClassLoader.java:358) at java.lang.Class.forName0(Native Method) at java.lang.Class.forName(Class.java:274) at org.apache.ignite.internal.util.IgniteUtils.forName(IgniteUtils.java:8350) at org.apache.ignite.internal.MarshallerContextAdapter.getClass(MarshallerContextAdapter.java:185) at org.apache.ignite.marshaller.optimized.OptimizedMarshallerUtils.classDescriptor(OptimizedMarshallerUtils.java:266) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:318) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readFields(OptimizedObjectInputStream.java:491) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readSerializable(OptimizedObjectInputStream.java:579) at org.apache.ignite.marshaller.optimized.OptimizedClassDescriptor.read(OptimizedClassDescriptor.java:841) at org.apache.ignite.marshaller.optimized.OptimizedObjectInputStream.readObjectOverride(OptimizedObjectInputStream.java:324) at java.io.ObjectInputStream.readObject(ObjectInputStream.java:364) at org.apache.ignite.marshaller.optimized.OptimizedMarshaller.unmarshal(OptimizedMarshaller.java:218) ... 14 more

There are several things I cannot get.

1) How can I fix it?

2) As this is not not "broadcast" or something, I supposed Ignite runs Streaming code on calling node only. Looks like I was wrong. So where my Streaming code is executed?

3) After printing "Finished" line my code doesn't stop. Why? Looks like some non-daemon thread is still there. Is this a Streaming code which prevents my client node from exiting?

PS

peer classloading is enabled. if I run some example with broadcast which executes code on many nodes - it works ok.


Solution

  • Basically IgniteDataStreamer prepares data batches on a sender side (client side in your example) and sends them right away to destination nodes that should store specific key-value tuples. Keeping this in mind the answers for your questions will be the following:

    1. Transformers are executed on destination nodes (server nodes) before an entry is placed into a cache. It means that a server node must have transformer's class in its classpath or, alternatively, you have to enable peer-class-loading. Personally, the latter is more flexible and preferable solution.
    2. As it's explained above a sender simply prepares batches that are sent to all the servers where a cache is deployed. Servers receive only those batches that contain tuples for which a server is a primary or backup one.
    3. Flushing of the batches happens in background because IgniteDataStreamer is used for fast data pre-loading or complex streaming processing (CEP). There are a number of parameters that let you tune the flushing - autoFlustFrequency, perNodeBufferSize.

    Lastly, for pre-loading needs (when caches are empty and you need fill them up) I would suggest setting allowOverwrite to false which will allow the streamer to prepare and sending batches for primary and backup nodes separately. If this parameter is set to true then the batches are sent on primary nodes only and primary nodes injects data with basic cache.put operations after that updating its version of data and corresponding backups. This approach is slower if you just need to pre-load your caches.