I'm trying to play with Ignite (v2.8.0) and ContinuousQuery-es on my local machine. My setup is one server node and two clients.
The problem is once server node started and first client connected and listening for updates with ContinuousQuery any other subsequent client failing with following exception:
[ERROR] 2020-04-18 20:00:18.436 [tcp-client-disco-msg-worker-#4] TcpDiscoverySpi - Failed to unmarshal discovery data for component: CONTINUOUS_PROC
I also spot it keeps failing until query cursor is closed in first client (once closed any other single client can connect).
Server node code:
public class IgniteServerCacheBootstrap {
final static Logger logger = LoggerFactory.getLogger(IgniteCacheClient.class);
public static void main(String[] args) throws IgniteCheckedException, InterruptedException {
IgniteConfiguration serverConfig = new IgniteConfiguration()
.setGridLogger(new Log4J2Logger("log4j2.xml"));
Ignite server = Ignition.start(serverConfig);
Thread.currentThread().join();
}
}
Client node code (mostly taken from Ignite samples) I'm trying to run two of such nodes in parallel:
public class IgniteCacheClient implements Serializable {
Logger logger = LoggerFactory.getLogger(IgniteCacheClient.class);
private IgniteCache igniteCache;
public IgniteCacheClient() throws IgniteCheckedException {
IgniteConfiguration clientConfig = new IgniteConfiguration()
.setGridLogger(new Log4J2Logger("log4j2.xml"))
.setClientMode(true);
Ignite client = Ignition.getOrStart(clientConfig);
igniteCache = client.getOrCreateCache("MY_CACHE");
}
public void run() throws InterruptedException {
// Create new continuous query.
ContinuousQuery<Integer, String> qry = new ContinuousQuery<>();
qry.setInitialQuery(new ScanQuery<>(new IgniteBiPredicate<Integer, String>() {
@Override
public boolean apply(Integer key, String val) {
return key > 10;
}
}));
// Callback that is called locally when update notifications are received.
qry.setLocalListener(new CacheEntryUpdatedListener<Integer, String>() {
@Override
public void onUpdated(Iterable<CacheEntryEvent<? extends Integer, ? extends String>> evts) {
for (CacheEntryEvent<? extends Integer, ? extends String> e : evts)
logger.info("Updated entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
}
});
// This filter will be evaluated remotely on all nodes.
// Entry that pass this filter will be sent to the caller.
qry.setRemoteFilterFactory(new Factory<CacheEntryEventFilter<Integer, String>>() {
@Override
public CacheEntryEventFilter<Integer, String> create() {
return new CacheEntryEventFilter<Integer, String>() {
@Override
public boolean evaluate(CacheEntryEvent<? extends Integer, ? extends String> e) {
return e.getKey() > 10;
}
};
}
});
// Execute query.
QueryCursor<Cache.Entry<Integer, String>> cur = igniteCache.query(qry);
// Iterate through existing data.
for (Cache.Entry<Integer, String> e : cur)
logger.info("Queried existing entry [key=" + e.getKey() + ", val=" + e.getValue() + ']');
Thread.currentThread().join();
}
}
Full exception
[ERROR] 2020-04-18 20:00:18.436 [tcp-client-disco-msg-worker-#4] TcpDiscoverySpi - Failed to unmarshal discovery data for component: CONTINUOUS_PROC
org.apache.ignite.IgniteCheckedException: Failed to deserialize object with given class loader: sun.misc.Launcher$AppClassLoader@18b4aac2
at org.apache.ignite.marshaller.jdk.JdkMarshaller.unmarshal0(JdkMarshaller.java:132) ~[ignite-core-2.8.0.jar:2.8.0]
at org.apache.ignite.marshaller.AbstractNodeNameAwareMarshaller.unmarshal(AbstractNodeNameAwareMarshaller.java:93) ~[ignite-core-2.8.0.jar:2.8.0]
at org.apache.ignite.internal.util.IgniteUtils.unmarshalZip(IgniteUtils.java:10248) [ignite-core-2.8.0.jar:2.8.0]
at org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket.unmarshalData(DiscoveryDataPacket.java:340) [ignite-core-2.8.0.jar:2.8.0]
at org.apache.ignite.spi.discovery.tcp.internal.DiscoveryDataPacket.unmarshalGridData(DiscoveryDataPacket.java:155) [ignite-core-2.8.0.jar:2.8.0]
at org.apache.ignite.spi.discovery.tcp.TcpDiscoverySpi.onExchange(TcpDiscoverySpi.java:2069) [ignite-core-2.8.0.jar:2.8.0]
at org.apache.ignite.spi.discovery.tcp.ClientImpl$MessageWorker.processNodeAddFinishedMessage(ClientImpl.java:2219) [ignite-core-2.8.0.jar:2.8.0]
at org.apache.ignite.spi.discovery.tcp.ClientImpl$MessageWorker.processDiscoveryMessage(ClientImpl.java:2088) [ignite-core-2.8.0.jar:2.8.0]
at org.apache.ignite.spi.discovery.tcp.ClientImpl$MessageWorker.body(ClientImpl.java:1930) [ignite-core-2.8.0.jar:2.8.0]
at org.apache.ignite.internal.util.worker.GridWorker.run(GridWorker.java:120) [ignite-core-2.8.0.jar:2.8.0]
at org.apache.ignite.spi.discovery.tcp.ClientImpl$1.body(ClientImpl.java:302) [ignite-core-2.8.0.jar:2.8.0]
at org.apache.ignite.spi.IgniteSpiThread.run(IgniteSpiThread.java:61) [ignite-core-2.8.0.jar:2.8.0]
Caused by: java.io.InvalidObjectException: Failed to find cache for name: MY_CACHE
at org.apache.ignite.internal.processors.cache.GridCacheContext.readResolve(GridCacheContext.java:2376) ~[ignite-core-2.8.0.jar:2.8.0]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_172]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_172]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_172]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_172]
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2076) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.readExternal(IgniteCacheProxyImpl.java:2192) ~[ignite-core-2.8.0.jar:2.8.0]
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.readExternal(GatewayProtectedCacheProxy.java:1706) ~[ignite-core-2.8.0.jar:2.8.0]
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
at org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandlerV2.readExternal(CacheContinuousQueryHandlerV2.java:179) ~[ignite-core-2.8.0.jar:2.8.0]
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
at java.util.HashMap.readObject(HashMap.java:1409) ~[?:1.8.0_172]
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) ~[?:?]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_172]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_172]
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
at org.apache.ignite.internal.util.IgniteUtils.readMap(IgniteUtils.java:5409) [ignite-core-2.8.0.jar:2.8.0]
at org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$DiscoveryData.readExternal(GridContinuousProcessor.java:2446) ~[ignite-core-2.8.0.jar:2.8.0]
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
at org.apache.ignite.marshaller.jdk.JdkMarshaller.unmarshal0(JdkMarshaller.java:124) ~[ignite-core-2.8.0.jar:2.8.0]
... 11 more
Caused by: java.lang.IllegalStateException: Failed to find cache for name: MY_CACHE
at org.apache.ignite.internal.processors.cache.GridCacheContext.readResolve(GridCacheContext.java:2371) ~[ignite-core-2.8.0.jar:2.8.0]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[?:1.8.0_172]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[?:1.8.0_172]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_172]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_172]
at java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1248) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2076) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
at org.apache.ignite.internal.processors.cache.IgniteCacheProxyImpl.readExternal(IgniteCacheProxyImpl.java:2192) ~[ignite-core-2.8.0.jar:2.8.0]
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
at org.apache.ignite.internal.processors.cache.GatewayProtectedCacheProxy.readExternal(GatewayProtectedCacheProxy.java:1706) ~[ignite-core-2.8.0.jar:2.8.0]
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
at org.apache.ignite.internal.processors.cache.query.continuous.CacheContinuousQueryHandlerV2.readExternal(CacheContinuousQueryHandlerV2.java:179) ~[ignite-core-2.8.0.jar:2.8.0]
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2285) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2209) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
at java.util.HashMap.readObject(HashMap.java:1409) ~[?:1.8.0_172]
at sun.reflect.GeneratedMethodAccessor3.invoke(Unknown Source) ~[?:?]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[?:1.8.0_172]
at java.lang.reflect.Method.invoke(Method.java:498) ~[?:1.8.0_172]
at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1158) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2176) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2067) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
at org.apache.ignite.internal.util.IgniteUtils.readMap(IgniteUtils.java:5409) ~[ignite-core-2.8.0.jar:2.8.0]
at org.apache.ignite.internal.processors.continuous.GridContinuousProcessor$DiscoveryData.readExternal(GridContinuousProcessor.java:2446) ~[ignite-core-2.8.0.jar:2.8.0]
at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:2116) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2065) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1571) ~[?:1.8.0_172]
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:431) ~[?:1.8.0_172]
at org.apache.ignite.marshaller.jdk.JdkMarshaller.unmarshal0(JdkMarshaller.java:124) ~[ignite-core-2.8.0.jar:2.8.0]
... 11 more
"Failed to find cache for name: MY_CACHE"
I think that your private IgniteCache igniteCache
is being transferred with your lambdas. Please try converting all of your anonymous inner classes to named static inner classes. I'm talking about predicates, listeners and factories.