Search code examples
ignite

"Unknown Pair" exception when using StreamTransformer for BinaryObject


I have a cache which store BinaryObject actually in a cluster(2 nodes). Ignite version is 2.1.0. If I don't use any StreamReceiver(include StreamTransformer), there is no problems when adding lots of BinaryObject data with following code:

IgniteDataStreamer<Long,BinaryObject> ds = ignite.dataStreamer(CACHE_NAME);
SecureRandom random = new SecureRandom();
long i = 0;
long count = 1000000;
while(i++<count){
    builder.setField("id", i);
    builder.setField("name", "Test"+i);
    builder.setField("age", random.nextInt(30));
    builder.setField("score", random.nextDouble()*100d);
    builder.setField("birthday", new Date());
    ds.addData(i, builder.build());
    if(i%10000==0){
        System.out.println(i+" added...");
    }
}

But now, I want to modify my BinaryObject data value before adding, so I tried StreamTransformer like this:

ds.receiver(new StreamTransformer<Long,BinaryObject>(){
    @Override
    public Object process(MutableEntry<Long, BinaryObject> entry, Object... arguments)
            throws EntryProcessorException {
        // TODO Auto-generated method stub
        Long key = entry.getKey();
        BinaryObject value = entry.getValue();
        BinaryObjectBuilder builder = value.toBuilder();
        //want to change the value of "name" field
        builder.setField("name", "Modify"+builder.getField("name"));
        entry.setValue(builder.build());
        return null;
    }
});
while(...){
    //... original code to build BinaryObject data and call ds.add method
}

Unluckily following exceptions occurred:

[09:52:36] Topology snapshot [ver=61, servers=2, clients=0, CPUs=8, heap=2.7GB]
10000 added...
20000 added...
30000 added...
40000 added...
[09:52:39,174][SEVERE][data-streamer-#54%null%][DataStreamerImpl] DataStreamer operation failed.
class org.apache.ignite.IgniteCheckedException: Failed to finish operation (too many remaps): 32
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5.apply(DataStreamerImpl.java:869)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$5.apply(DataStreamerImpl.java:834)
    at org.apache.ignite.internal.util.future.GridFutureAdapter.notifyListener(GridFutureAdapter.java:382)
    at org.apache.ignite.internal.util.future.GridFutureAdapter.unblock(GridFutureAdapter.java:346)
    at org.apache.ignite.internal.util.future.GridFutureAdapter.unblockAll(GridFutureAdapter.java:334)
    at org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:494)
    at org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:473)
    at org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:461)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$Buffer$2.apply(DataStreamerImpl.java:1572)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerImpl$Buffer$2.apply(DataStreamerImpl.java:1562)
    at org.apache.ignite.internal.util.future.GridFutureAdapter.notifyListener(GridFutureAdapter.java:382)
    at org.apache.ignite.internal.util.future.GridFutureAdapter.unblock(GridFutureAdapter.java:346)
    at org.apache.ignite.internal.util.future.GridFutureAdapter.unblockAll(GridFutureAdapter.java:334)
    at org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:494)
    at org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:473)
    at org.apache.ignite.internal.util.future.GridFutureAdapter.onDone(GridFutureAdapter.java:461)
    at org.apache.ignite.internal.processors.closure.GridClosureProcessor$2.body(GridClosureProcessor.java:967)
    at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:110)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
Caused by: class org.apache.ignite.IgniteCheckedException: Unknown pair [platformId=0, typeId=-1496463502]
    at org.apache.ignite.internal.util.IgniteUtils.cast(IgniteUtils.java:7229)
    ... 5 more
Caused by: java.lang.ClassNotFoundException: Unknown pair [platformId=0, typeId=-1496463502]
    at org.apache.ignite.internal.MarshallerContextImpl.getClassName(MarshallerContextImpl.java:392)
    at org.apache.ignite.internal.MarshallerContextImpl.getClass(MarshallerContextImpl.java:342)
    at org.apache.ignite.internal.binary.BinaryContext.descriptorForTypeId(BinaryContext.java:686)
    at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize0(BinaryReaderExImpl.java:1755)
    at org.apache.ignite.internal.binary.BinaryReaderExImpl.deserialize(BinaryReaderExImpl.java:1714)
    at org.apache.ignite.internal.binary.BinaryObjectImpl.deserializeValue(BinaryObjectImpl.java:797)
    at org.apache.ignite.internal.binary.BinaryObjectImpl.value(BinaryObjectImpl.java:143)
    at org.apache.ignite.internal.processors.cache.CacheObjectUtils.unwrapBinary(CacheObjectUtils.java:161)
    at org.apache.ignite.internal.processors.cache.CacheObjectUtils.unwrapBinaryIfNeeded(CacheObjectUtils.java:41)
    at org.apache.ignite.internal.processors.cache.CacheObjectContext.unwrapBinaryIfNeeded(CacheObjectContext.java:125)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerEntry$1.getValue(DataStreamerEntry.java:96)
    at org.apache.ignite.stream.StreamTransformer.receive(StreamTransformer.java:45)
    at org.apache.ignite.internal.processors.datastreamer.DataStreamerUpdateJob.call(DataStreamerUpdateJob.java:137)
    at org.apache.ignite.internal.util.IgniteUtils.wrapThreadLoader(IgniteUtils.java:6608)
    at org.apache.ignite.internal.processors.closure.GridClosureProcessor$2.body(GridClosureProcessor.java:959)
    ... 4 more

What should I do to fix it?


Solution

  • You get ClassNotFoundException because DataStreamer internally tries to deserialize stored BinaryObject. To make it use BinaryObjects directly, you should invoke ds.keepBinary(true) before using it.

    Another problem you have in your code is the way you use result of entry.getValue(). Actually, entry, passed to process method represents a record previously stored in cache, so you'll most likely get null value there. If you want to get a newly-assigned value, you should use arguments[0] value.