Search code examples
hazelcast-jet

Hazelcast-jet: got error when enriching stream using direct lookup


I am following Doc to try out how to enrich an unbounded stream by directly looking up from a IMap. I have two Maps:

  1. Product: Map<String, Product> (ProductId as key)
  2. Seller: Map<String, Seller> (SellerId as key)

Both Product and Seller are very simple classes:

public class Product implements DataSerializable {
    String productId;
    String sellerId;
    int price;
...
public class Seller implements DataSerializable {
    String sellerId;
    int revenue;
...

I have two data generators keep pushing data to the two maps. The event-journal are enabled for both maps. I have verified the event-journal works fine.

I want to enrich the stream event of Product map with Seller map. Here is a snippet of my code:

IMap<String, Seller> sellerIMap = jetClient.getMap(SellerDataGenerator.SELLER_MAP);
StreamSource<Product> productStreamSource = Sources.mapJournal(ProductDataGenerator.PRODUCT_MAP, Util.mapPutEvents(), Util.mapEventNewValue(), START_FROM_CURRENT);
p.drawFrom(productStreamSource)
            .withoutTimestamps()
            .groupingKey(Product::getSellerId)
            .mapUsingIMap(sellerIMap, (product, seller) -> new EnrichedProduct(product, seller))
            .drainTo(getSink());
try {
        JobConfig jobConfig = new JobConfig();
        jobConfig.addClass(TaskSubmitter.class).addClass(Seller.class).addClass(Product.class).addClass(ExtendedProduct.class);
        jobConfig.setName(Constants.BASIC_TASK);
        Job job = jetClient.newJob(p, jobConfig);
    } finally {
        jetClient.shutdown();
    }

When job was submitted, I got following error:

com.hazelcast.spi.impl.operationservice.impl.Invocation - [172.31.33.212]:80 [jet] [3.1] Failed asynchronous execution of execution callback: com.hazelcast.util.executor.DelegatingFuture$DelegatingExecutionCallback@77ac0407for call Invocation{op=com.hazelcast.map.impl.operation.GetOperation{serviceName='hz:impl:mapService', identityHash=1939050026, partitionId=70, replicaIndex=0, callId=-37944, invocationTime=1570410704479 (2019-10-07 01:11:44.479), waitTimeout=-1, callTimeout=60000, name=sellerMap}, tryCount=250, tryPauseMillis=500, invokeCount=1, callTimeoutMillis=60000, firstInvocationTimeMs=1570410704479, firstInvocationTime='2019-10-07 01:11:44.479', lastHeartbeatMillis=0, lastHeartbeatTime='1970-01-01 00:00:00.000', target=[172.31.33.212]:80, pendingResponse={VOID}, backupsAcksExpected=0, backupsAcksReceived=0, connection=null}

I tried to put one and two instances in my cluster and got the same error message. I couldn't figure out what was the root cause.


Solution

  • It seems that your problem is a ClassNotFoundException, even though you added the appropriate classes to the job. The objects you store in the IMap exist independent of your Jet job and when the event journal source asks for them, Jet's IMap code tries to deserialize them and fails because Jet doesn't have your domain model classes on its classpath.

    To move on, add a JAR with the classes you use in the IMap to Jet's classpath. We are looking for a solution that will remove this requirement.

    The reason you haven't got the exception stacktrace in the log output is due to the default java.util.logging setup you end up with when you don't explicitly add a more flexible logging module, such as Log4j.

    The next version of Jet's packaging will improve this aspect. Until that time you can follow these steps:

    1. Go to the lib directory of Jet's distribution package and download Log4j into it:

      $ cd lib
      $ wget https://repo1.maven.org/maven2/log4j/log4j/1.2.17/log4j-1.2.17.jar
      
    2. Edit bin/common.sh to add the module to the classpath. Towards the end of the file there is a line

      CLASSPATH="$JET_HOME/lib/hazelcast-jet-3.1.jar:$CLASSPATH"
      

      You can duplicate this line and replace hazelcast-jet-3.1 with log4j-1.2.17.

    3. At the end of commons.sh there is a multi-line command that constructs the JAVA_OPTS variable. Add "-Dhazelcast.logging.type=log4j" and "-Dlog4j.configuration=file:$JET_HOME/config/log4j.properties" to the list.

    4. Create a file log4j.properties in the config directory:

    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.Target=System.out
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d %5p [%c{1}] [%t] - %m%n
    
    # Change this level to debug to diagnose failed cluster formation:
    log4j.logger.com.hazelcast.internal.cluster=info
    
    log4j.logger.com.hazelcast.jet=info
    log4j.rootLogger=info, stdout