I need to implement cross platform map using portable serialization
I need to insert / update too many values so I use Entryprocessor for insert or update as shown below:
package com.mycompany.common;
import java.io.IOException;
import java.util.Map.Entry;
import com.hazelcast.core.Offloadable;
import com.hazelcast.map.AbstractEntryProcessor;
import com.hazelcast.nio.ObjectDataInput;
import com.hazelcast.nio.ObjectDataOutput;
import com.hazelcast.nio.serialization.Portable;
import com.hazelcast.nio.serialization.PortableReader;
import com.hazelcast.nio.serialization.PortableWriter;
public class MyEntryProcessor extends AbstractEntryProcessor<String, MapValue> implements Offloadable, Portable {
private static final long serialVersionUID = 1L;
private MapValue mapValue;
public MyEntryProcessor() {
}
public MyEntryProcessor(MapValue mapValue) {
this.mapValue = mapValue;
}
@Override
public Object process(Entry<String, MapValue> entry) {
MapValue valueToSet = null;
if (null == entry.getValue()) {
valueToSet = mapValue;
} else {
MapValue valueToUpdate = entry.getValue();
valueToUpdate.setData(mapValue.getData());
valueToSet = valueToUpdate;
}
entry.setValue(valueToSet);
return null;
}
@Override
public String getExecutorName() {
//return NO_OFFLOADING;
return OFFLOADABLE_EXECUTOR;
}
@Override
public int getClassId() {
return 2;
}
@Override
public int getFactoryId() {
return 1;
}
@Override
public void readPortable(PortableReader reader) throws IOException {
if (reader.readBoolean("_has__mapValue")) {
ObjectDataInput in = reader.getRawDataInput();
mapValue = in.readObject();
}
}
@Override
public void writePortable(PortableWriter writer) throws IOException {
boolean hasMapValue = (mapValue != null);
writer.writeBoolean("_has__mapValue", hasMapValue);
if (hasMapValue) {
ObjectDataOutput out = writer.getRawDataOutput();
out.writeObject(mapValue);
}
}
}
MyEntryProcessor implements Portable .. When there is one hazelcast member there is no problem. When I send elements for processing using hzlMap.executeOnKeys() no error occurs.
But when there are more members I get the below exception from members..
Exception in thread "main" com.hazelcast.nio.serialization.HazelcastSerializationException: Failed to serialize 'com.hazelcast.spi.impl.operationservice.impl.operations.Backup'
at com.hazelcast.internal.serialization.impl.SerializationUtil.handleSerializeException(SerializationUtil.java:75)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toBytes(AbstractSerializationService.java:155)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toBytes(AbstractSerializationService.java:128)
at com.hazelcast.spi.impl.operationservice.impl.OutboundOperationHandler.send(OutboundOperationHandler.java:51)
at com.hazelcast.spi.impl.operationservice.impl.OperationBackupHandler.sendSingleBackup(OperationBackupHandler.java:217)
at com.hazelcast.spi.impl.operationservice.impl.OperationBackupHandler.makeBackups(OperationBackupHandler.java:189)
at com.hazelcast.spi.impl.operationservice.impl.OperationBackupHandler.sendBackups0(OperationBackupHandler.java:108)
at com.hazelcast.spi.impl.operationservice.impl.OperationBackupHandler.sendBackups(OperationBackupHandler.java:74)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.handleResponse(OperationRunnerImpl.java:272)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:195)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:120)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.run(OperationThread.java:100)
at ------ submitted from ------.(Unknown Source)
at com.hazelcast.spi.impl.operationservice.impl.InvocationFuture.resolve(InvocationFuture.java:127)
at com.hazelcast.spi.impl.AbstractInvocationFuture$1.run(AbstractInvocationFuture.java:243)
at com.hazelcast.client.impl.protocol.task.AbstractPartitionMessageTask.execute(AbstractPartitionMessageTask.java:78)
at com.hazelcast.spi.impl.AbstractInvocationFuture.unblock(AbstractInvocationFuture.java:239)
at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockAll(AbstractInvocationFuture.java:225)
at com.hazelcast.spi.impl.AbstractInvocationFuture.unblockAll(AbstractInvocationFuture.java:229)
at com.hazelcast.spi.impl.AbstractInvocationFuture.complete(AbstractInvocationFuture.java:363)
at com.hazelcast.spi.impl.operationservice.impl.Invocation.complete(Invocation.java:632)
at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyNormalResponse(Invocation.java:314)
at com.hazelcast.spi.impl.operationservice.impl.Invocation.notifyError(Invocation.java:273)
at com.hazelcast.spi.impl.operationservice.impl.Invocation.sendResponse(Invocation.java:196)
at com.hazelcast.map.impl.operation.EntryOperation$3.handleResponse(EntryOperation.java:337)
at com.hazelcast.map.impl.operation.EntryOperation$3.sendResponse(EntryOperation.java:310)
at com.hazelcast.spi.Operation.sendResponse(Operation.java:353)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.sendResponseAfterOperationError(OperationRunnerImpl.java:367)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.handleOperationError(OperationRunnerImpl.java:361)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:198)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:120)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.run(OperationThread.java:100)
at ------ submitted from ------.(Unknown Source)
at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveAndThrowIfException(ClientInvocationFuture.java:96)
at com.hazelcast.client.spi.impl.ClientInvocationFuture.resolveAndThrowIfException(ClientInvocationFuture.java:33)
at com.hazelcast.spi.impl.AbstractInvocationFuture.get(AbstractInvocationFuture.java:155)
at com.hazelcast.client.spi.ClientProxy.invokeOnPartition(ClientProxy.java:204)
at com.hazelcast.client.spi.ClientProxy.invoke(ClientProxy.java:198)
at com.hazelcast.client.proxy.ClientMapProxy.executeOnKeyInternal(ClientMapProxy.java:1294)
at com.hazelcast.client.proxy.ClientMapProxy.executeOnKey(ClientMapProxy.java:1287)
at com.mycompany.client.service.MapRepo.putEntriesUsingEntryProcessor(MapRepo.java:31)
at com.mycompany.client.ClientAppMain.main(ClientAppMain.java:25)
Caused by: com.hazelcast.nio.serialization.HazelcastSerializationException: Failed to serialize 'com.hazelcast.map.impl.operation.EntryBackupOperation'
at com.hazelcast.internal.serialization.impl.SerializationUtil.handleSerializeException(SerializationUtil.java:75)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.writeObject(AbstractSerializationService.java:252)
at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:370)
at com.hazelcast.spi.impl.operationservice.impl.operations.Backup.writeInternal(Backup.java:233)
at com.hazelcast.spi.Operation.writeData(Operation.java:565)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.write(DataSerializableSerializer.java:201)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.write(DataSerializableSerializer.java:50)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.write(StreamSerializerAdapter.java:43)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toBytes(AbstractSerializationService.java:152)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.toBytes(AbstractSerializationService.java:128)
at com.hazelcast.spi.impl.operationservice.impl.OutboundOperationHandler.send(OutboundOperationHandler.java:51)
at com.hazelcast.spi.impl.operationservice.impl.OperationBackupHandler.sendSingleBackup(OperationBackupHandler.java:217)
at com.hazelcast.spi.impl.operationservice.impl.OperationBackupHandler.makeBackups(OperationBackupHandler.java:189)
at com.hazelcast.spi.impl.operationservice.impl.OperationBackupHandler.sendBackups0(OperationBackupHandler.java:108)
at com.hazelcast.spi.impl.operationservice.impl.OperationBackupHandler.sendBackups(OperationBackupHandler.java:74)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.handleResponse(OperationRunnerImpl.java:272)
at com.hazelcast.spi.impl.operationservice.impl.OperationRunnerImpl.run(OperationRunnerImpl.java:195)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.process(OperationThread.java:120)
at com.hazelcast.spi.impl.operationexecutor.impl.OperationThread.run(OperationThread.java:100)
Caused by: com.hazelcast.nio.serialization.HazelcastSerializationException: Failed to serialize 'com.hazelcast.map.AbstractEntryProcessor$EntryBackupProcessorImpl'
at com.hazelcast.internal.serialization.impl.SerializationUtil.handleSerializeException(SerializationUtil.java:75)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.writeObject(AbstractSerializationService.java:252)
at com.hazelcast.internal.serialization.impl.ByteArrayObjectDataOutput.writeObject(ByteArrayObjectDataOutput.java:370)
at com.hazelcast.map.impl.operation.EntryBackupOperation.writeInternal(EntryBackupOperation.java:72)
at com.hazelcast.spi.Operation.writeData(Operation.java:565)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.write(DataSerializableSerializer.java:201)
at com.hazelcast.internal.serialization.impl.DataSerializableSerializer.write(DataSerializableSerializer.java:50)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.write(StreamSerializerAdapter.java:43)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.writeObject(AbstractSerializationService.java:250)
... 17 more
Caused by: java.io.NotSerializableException: com.mycompany.common.MapValue
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at com.hazelcast.internal.serialization.impl.JavaDefaultSerializers$JavaSerializer.write(JavaDefaultSerializers.java:242)
at com.hazelcast.internal.serialization.impl.StreamSerializerAdapter.write(StreamSerializerAdapter.java:43)
at com.hazelcast.internal.serialization.impl.AbstractSerializationService.writeObject(AbstractSerializationService.java:250)
... 24 more
Hazelcast is trying to serialize MyEntryProcessor using default serialization although it implements Portable?
Why is this happening? Am I missing sth?
How can it be corrected?
You can find sample project demonstrating the problem at :
https://github.com/simpleusr/hazelcastproblem
Please note that in order to reproduce the problem start two hazelcast members using hazelcast-dev1.xml and hazelcast-dev2.xml. When there is only one member there is no problem...
@simpleusr, I believe I found the cause. Please see below EP implementation.
The problem seems to be related to EntryBackupProcessorImpl
used in AbstractEntryProcessor
. It has a special precalculated serialVersionUID
as described in the source so setting EP as Portable causes this exception since it cannot convert MapValue
object. When you implement both EntryProcessor
and EntryBackupProcessor
in your EP, the issue disappears.
public class MyEntryProcessor implements EntryProcessor<String, MapValue>, EntryBackupProcessor<String, MapValue>, Offloadable, Portable {
private static final long serialVersionUID = 1L;
private MapValue mapValue;
public MyEntryProcessor() {
}
public MyEntryProcessor(MapValue mapValue) {
this.mapValue = mapValue;
}
@Override
public Object process(Entry<String, MapValue> entry) {
MapValue valueToSet = null;
if (null == entry.getValue()) {
valueToSet = mapValue;
} else {
MapValue valueToUpdate = entry.getValue();
valueToUpdate.setData(mapValue.getData());
valueToSet = valueToUpdate;
}
entry.setValue(valueToSet);
return null;
}
@Override
public String getExecutorName() {
//return NO_OFFLOADING;
return OFFLOADABLE_EXECUTOR;
}
@Override
public int getClassId() {
return 2;
}
@Override
public int getFactoryId() {
return 1;
}
@Override
public void readPortable(PortableReader reader) throws IOException {
if (reader.readBoolean("_has__mapValue")) {
ObjectDataInput in = reader.getRawDataInput();
mapValue = in.readObject();
}
}
@Override
public void writePortable(PortableWriter writer) throws IOException {
boolean hasMapValue = (mapValue != null);
writer.writeBoolean("_has__mapValue", hasMapValue);
if (hasMapValue) {
ObjectDataOutput out = writer.getRawDataOutput();
out.writeObject(mapValue);
}
}
@Override
public EntryBackupProcessor<String, MapValue> getBackupProcessor() {
return this;
}
@Override
public void processBackup(Entry<String, MapValue> entry) {
process(entry);
}
}