Search code examples
javadistributed-computinghazelcasthazelcast-imap

Hazelcast - Why entryprocessors are not using Portable?


I need to implement cross platform map using portable serialization

I need to insert / update too many values so I use Entryprocessor for insert or update as shown below:

package com.mycompany.common;

import java.io.IOException;
import java.util.Map.Entry;

import com.hazelcast.core.Offloadable;
import com.hazelcast.map.AbstractEntryProcessor;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.Portable;
import com.hazelcast.nio.serialization.PortableReader;
import com.hazelcast.nio.serialization.PortableWriter;

public class MyEntryProcessor extends AbstractEntryProcessor<String, MapValue> implements Offloadable, Portable {

    private static final long   serialVersionUID = 1L;


    private MapValue mapValue;


    public MyEntryProcessor() {

    }

    public MyEntryProcessor(MapValue mapValue) {
        this.mapValue = mapValue;
    }

    @Override
    public Object process(Entry<String, MapValue> entry) {

        MapValue valueToSet = null;
        if (null == entry.getValue()) {
            valueToSet = mapValue;
        } else {
            MapValue valueToUpdate = entry.getValue();
            valueToUpdate.setData(mapValue.getData());
            valueToSet = valueToUpdate;
        }

        entry.setValue(valueToSet);

        return null;
    }

    @Override
    public String getExecutorName() {
        //return NO_OFFLOADING;
        return OFFLOADABLE_EXECUTOR;
    }

    @Override
    public int getClassId() {
        return 2;
    }

    @Override
    public int getFactoryId() {
        return 1;
    }

    @Override
    public void readPortable(PortableReader reader) throws IOException {
        if (reader.readBoolean("_has__mapValue")) {
            ObjectDataInput in = reader.getRawDataInput();
            mapValue = in.readObject();
        }
    }

    @Override
    public void writePortable(PortableWriter writer) throws IOException {
        boolean hasMapValue = (mapValue != null);
        writer.writeBoolean("_has__mapValue", hasMapValue);
        if (hasMapValue) {
            ObjectDataOutput out = writer.getRawDataOutput();
            out.writeObject(mapValue);
        }

    }


}

MyEntryProcessor implements Portable .. When there is one hazelcast member there is no problem. When I send elements for processing using hzlMap.executeOnKeys() no error occurs.

But when there are more members I get the below exception from members..

 Exception in thread "main" com.hazelcast.nio.serialization.HazelcastSerializationException: Failed to serialize 'com.hazelcast.spi.impl.operationservice.impl.operations.Backup'
    at com.hazelcast.internal.serialization.impl.SerializationUtil.handleSerializeException(SerializationUtil.java:75)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toBytes(AbstractSerializationService.java:155)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toBytes(AbstractSerializationService.java:128)
    at com.hazelcast.spi.impl.operationservice.impl.OutboundOperationHandler.send(OutboundOperationHandler.java:51)
    at com.hazelcast.spi.impl.operationservice.impl.OperationBackupHandler.sendSingleBackup(OperationBackupHandler.java:217)
    at com.hazelcast.spi.impl.operationservice.impl.OperationBackupHandler.makeBackups(OperationBackupHandler.java:189)
    at com.hazelcast.spi.impl.operationservice.impl.OperationBackupHandler.sendBackups0(OperationBackupHandler.java:108)
    at com.hazelcast.spi.impl.operationservice.impl.OperationBackupHandler.sendBackups(OperationBackupHandler.java:74)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.handleResponse(OperationRunnerImpl.java:272)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:195)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:120)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.run(OperationThread.java:100)
    at ------ submitted from ------.(Unknown Source)
    at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolve(InvocationFuture.java:127)
    at com.hazelcast.spi.impl.AbstractInvocationFuture$1.run(AbstractInvocationFuture.java:243)
    at com.hazelcast.client.impl.protocol.task.AbstractPartitionMessageTask.execute(AbstractPartitionMessageTask.java:78)
    at com.hazelcast.spi.impl.AbstractInvocationFuture.unblock(AbstractInvocationFuture.java:239)
    at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockAll(AbstractInvocationFuture.java:225)
    at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockAll(AbstractInvocationFuture.java:229)
    at com.hazelcast.spi.impl.AbstractInvocationFuture.complete(AbstractInvocationFuture.java:363)
    at com.hazelcast.spi.impl.operationservice.impl.Invocation.complete(Invocation.java:632)
    at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyNormalResponse(Invocation.java:314)
    at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyError(Invocation.java:273)
    at com.hazelcast.spi.impl.operationservice.impl.Invocation.sendResponse(Invocation.java:196)
    at com.hazelcast.map.impl.operation.EntryOperation$3.handleResponse(EntryOperation.java:337)
    at com.hazelcast.map.impl.operation.EntryOperation$3.sendResponse(EntryOperation.java:310)
    at com.hazelcast.spi.Operation.sendResponse(Operation.java:353)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.sendResponseAfterOperationError(OperationRunnerImpl.java:367)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.handleOperationError(OperationRunnerImpl.java:361)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:198)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:120)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.run(OperationThread.java:100)
    at ------ submitted from ------.(Unknown Source)
    at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveAndThrowIfException(ClientInvocationFuture.java:96)
    at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveAndThrowIfException(ClientInvocationFuture.java:33)
    at com.hazelcast.spi.impl.AbstractInvocationFuture.get(AbstractInvocationFuture.java:155)
    at com.hazelcast.client.spi.ClientProxy.invokeOnPartition(ClientProxy.java:204)
    at com.hazelcast.client.spi.ClientProxy.invoke(ClientProxy.java:198)
    at com.hazelcast.client.proxy.ClientMapProxy.executeOnKeyInternal(ClientMapProxy.java:1294)
    at com.hazelcast.client.proxy.ClientMapProxy.executeOnKey(ClientMapProxy.java:1287)
    at com.mycompany.client.service.MapRepo.putEntriesUsingEntryProcessor(MapRepo.java:31)
    at com.mycompany.client.ClientAppMain.main(ClientAppMain.java:25)
Caused by: com.hazelcast.nio.serialization.HazelcastSerializationException: Failed to serialize 'com.hazelcast.map.impl.operation.EntryBackupOperation'
    at com.hazelcast.internal.serialization.impl.SerializationUtil.handleSerializeException(SerializationUtil.java:75)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.writeObject(AbstractSerializationService.java:252)
    at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:370)
    at com.hazelcast.spi.impl.operationservice.impl.operations.Backup.writeInternal(Backup.java:233)
    at com.hazelcast.spi.Operation.writeData(Operation.java:565)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.write(DataSerializableSerializer.java:201)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.write(DataSerializableSerializer.java:50)
    at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.write(StreamSerializerAdapter.java:43)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toBytes(AbstractSerializationService.java:152)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toBytes(AbstractSerializationService.java:128)
    at com.hazelcast.spi.impl.operationservice.impl.OutboundOperationHandler.send(OutboundOperationHandler.java:51)
    at com.hazelcast.spi.impl.operationservice.impl.OperationBackupHandler.sendSingleBackup(OperationBackupHandler.java:217)
    at com.hazelcast.spi.impl.operationservice.impl.OperationBackupHandler.makeBackups(OperationBackupHandler.java:189)
    at com.hazelcast.spi.impl.operationservice.impl.OperationBackupHandler.sendBackups0(OperationBackupHandler.java:108)
    at com.hazelcast.spi.impl.operationservice.impl.OperationBackupHandler.sendBackups(OperationBackupHandler.java:74)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.handleResponse(OperationRunnerImpl.java:272)
    at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:195)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:120)
    at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.run(OperationThread.java:100)
Caused by: com.hazelcast.nio.serialization.HazelcastSerializationException: Failed to serialize 'com.hazelcast.map.AbstractEntryProcessor$EntryBackupProcessorImpl'
    at com.hazelcast.internal.serialization.impl.SerializationUtil.handleSerializeException(SerializationUtil.java:75)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.writeObject(AbstractSerializationService.java:252)
    at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:370)
    at com.hazelcast.map.impl.operation.EntryBackupOperation.writeInternal(EntryBackupOperation.java:72)
    at com.hazelcast.spi.Operation.writeData(Operation.java:565)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.write(DataSerializableSerializer.java:201)
    at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.write(DataSerializableSerializer.java:50)
    at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.write(StreamSerializerAdapter.java:43)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.writeObject(AbstractSerializationService.java:250)
    ... 17 more
Caused by: java.io.NotSerializableException: com.mycompany.common.MapValue
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at com.hazelcast.internal.serialization.impl.JavaDefaultSerializers$JavaSerializer.write(JavaDefaultSerializers.java:242)
    at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.write(StreamSerializerAdapter.java:43)
    at com.hazelcast.internal.serialization.impl.AbstractSerializationService.writeObject(AbstractSerializationService.java:250)
    ... 24 more

Hazelcast is trying to serialize MyEntryProcessor using default serialization although it implements Portable?

Why is this happening? Am I missing sth?

How can it be corrected?

You can find sample project demonstrating the problem at :

https://github.com/simpleusr/hazelcastproblem

Please note that in order to reproduce the problem start two hazelcast members using hazelcast-dev1.xml and hazelcast-dev2.xml. When there is only one member there is no problem...


Solution

  • @simpleusr, I believe I found the cause. Please see below EP implementation.

    The problem seems to be related to EntryBackupProcessorImpl used in AbstractEntryProcessor. It has a special precalculated serialVersionUID as described in the source so setting EP as Portable causes this exception since it cannot convert MapValue object. When you implement both EntryProcessor and EntryBackupProcessor in your EP, the issue disappears.

    public class MyEntryProcessor implements EntryProcessor<String, MapValue>, EntryBackupProcessor<String, MapValue>, Offloadable, Portable {
    
    private static final long   serialVersionUID = 1L;
    
    
    private MapValue mapValue;
    
    
    public MyEntryProcessor() {
    
    }
    
    public MyEntryProcessor(MapValue mapValue) {
        this.mapValue = mapValue;
    }
    
    @Override
    public Object process(Entry<String, MapValue> entry) {
    
        MapValue valueToSet = null;
        if (null == entry.getValue()) {
            valueToSet = mapValue;
        } else {
            MapValue valueToUpdate = entry.getValue();
            valueToUpdate.setData(mapValue.getData());
            valueToSet = valueToUpdate;
        }
    
        entry.setValue(valueToSet);
    
        return null;
    }
    
    @Override
    public String getExecutorName() {
        //return NO_OFFLOADING;
        return OFFLOADABLE_EXECUTOR;
    }
    
    @Override
    public int getClassId() {
        return 2;
    }
    
    @Override
    public int getFactoryId() {
        return 1;
    }
    
    @Override
    public void readPortable(PortableReader reader) throws IOException {
        if (reader.readBoolean("_has__mapValue")) {
            ObjectDataInput in = reader.getRawDataInput();
            mapValue = in.readObject();
        }
    }
    
    @Override
    public void writePortable(PortableWriter writer) throws IOException {
        boolean hasMapValue = (mapValue != null);
        writer.writeBoolean("_has__mapValue", hasMapValue);
        if (hasMapValue) {
            ObjectDataOutput out = writer.getRawDataOutput();
            out.writeObject(mapValue);
        }
    
    }
    
    @Override
    public EntryBackupProcessor<String, MapValue> getBackupProcessor() {
        return this;
    }
    
    @Override
    public void processBackup(Entry<String, MapValue> entry) {
        process(entry);
    }
    }