Search code examples
lambdahazelcast-jet

Hazelcast Jet - how to use a non-static method in a Jet pipeline


I have a basic pipeline below. In one of the steps I want to transform an object by calling a method from a service as below. But Jet throws an error saying that this mapFn is not serializable. What to do here? It works perfectly fine for static methods.

p.readFrom(source)
     .map(r -> dataTransformer.transformRecord(r))// dataTransformer is a service
     .writeTo(Sinks.filesBuilder(userHome).build());

Solution

  • Use mapUsingService and create the service using the ServiceFactory:

    p.readFrom(source)
     .mapUsingService(
        ServiceFactories.sharedService(pctx -> new DataTransformer()),
        (dataTransformer, r) -> dataTransformer.transformRecord(r))
     ...
    

    Alternatively, if your service is serializable and stateless, you can copy it to a local variable:

    DataTransformer dataTransformerLocal = dataTransformer;
    p.readFrom(source)
         .map(r -> dataTransformerLocal.transformRecord(r))
         .writeTo(Sinks.filesBuilder(userHome).build());