Search code examples

Hazelcast Jet "mapFn" must be Serializable Error

Good day, i'm having a problem implementing some code source from github to my project so... i'm trying to build a pipeline, and then create StreamStage to read the sources and build a queuesink with the methods inside the project and i'm always getting the same error "Exception in thread "main" java.lang.IllegalArgumentException: "mapFn" must be serializable"

i was reading the docs about serialization in Hazelcast Jet and everything seems perfect i just dont know what's the problem inside the project

This is the attributes and constants:

public class FraudDetectionRun implements Serializable{

private final static ILogger log = Logger.getLogger(FraudDetectionRun.class);

private static final String TXN_QUEUE_ID = PropertiesLoader.TXN_QUEUE_ID;

private static final String ACCOUNT_MAP = PropertiesLoader.ACCOUNT_MAP;
private static final String MERCHANT_MAP = PropertiesLoader.MERCHANT_MAP;
private static final String RULESRESULT_MAP = PropertiesLoader.RULESRESULT_MAP;

private HazelcastInstance clientInstance;
private JetInstance jet;

private static MerchantRuleEngine merchantRuleEngine;
private static HistoricalDataRuleEngine historicalRuleEngine;

private IMap<String, Merchant> merchantMap;
private IMap<String, Account> accountMap;

public static void main(String[] args) {
    new FraudDetectionRun().start();

this is the main code

` private void start() { init();

    Pipeline p = buildPipeline();

    JobConfig jobConfig = new JobConfig();
    jobConfig.setName("Fraud Detection Job");

    jet.newJobIfAbsent(p, jobConfig);

private Pipeline buildPipeline() {
    Pipeline p = Pipeline.create();
    StreamStage<Transaction> transaction = p.readFrom(buildQueueSource())
            .map(restValue -> transformToTransaction(restValue))
            .setName("This is where we transfor the RestValue into a Transaction");
    StreamStage<Transaction> appliyingMerchantRules = -> applyMerchantRules(index))
            .setName("We search for the method Apply Merchant to apply the rules to the transaction");
    StreamStage<Transaction> rulesIntoTransaction = -> applyHistoricalTxnRules(index))
            .setName("Apply Historical transactions rules");
    rulesIntoTransaction.writeTo(, Transaction::getTransactionId, Transaction::getRulesResult));
    return p;

private String transformResultsToString(Transaction txn) {
    RulesResult result = txn.getRulesResult();
    return "txnID: "+txn.getTransactionId()+" "+result.getMerchantRisk()+" "+result.getTransactionRisk();

private Transaction applyHistoricalTxnRules(Transaction txn) {"Applying rules on historical data");
    historicalRuleEngine.apply(txn, accountMap.get(txn.getAccountNumber()).getHistoricalTransactions());
    return txn;

private Transaction applyMerchantRules(Transaction txn) {"Applying merchant rules");
    merchantRuleEngine.apply(txn, merchantMap.get(txn.getMerchantId()));
    return txn;

private static Transaction transformToTransaction(RestValue restValue) {"Applying transformToTransaction");
    return TransactionBuilderUtil.transformToTransaction(new String(restValue.getValue()));

private Sink<? super Transaction> buildQueueSink() {
     return SinkBuilder.sinkBuilder("queueSink",
             jet -> jet.jetInstance().getHazelcastInstance().<String>getQueue("sink-queue"))
             .<Transaction>receiveFn( (queue, txn)-> queue.add(transformResultsToString(txn)))

private StreamSource<RestValue> buildQueueSource() {
    StreamSource<RestValue> source = SourceBuilder.<QueueContext<RestValue>>stream(TXN_QUEUE_ID, c -> new QueueContext<>(c.jetInstance().getHazelcastInstance().getQueue(TXN_QUEUE_ID)))

    return source;

static class QueueContext<T> extends AbstractCollection<T> {
    static final int MAX_ELEMENTS = 1024;
    IQueue<T> queue;
    SourceBuilder.SourceBuffer<T> buf;
    QueueContext(IQueue<T> queue) {
        this.queue = queue;

    void fillBuffer(SourceBuilder.SourceBuffer<T> buf) {
        this.buf = buf;
        queue.drainTo(this, MAX_ELEMENTS);
    public boolean add(T item) {
        return true;
    public Iterator<T> iterator() {
        throw new UnsupportedOperationException();
    public int size() {
        throw new UnsupportedOperationException();

the program always throw this error

Exception in thread "main" java.lang.IllegalArgumentException: "mapFn" must be serializable
    at com.hazelcast.jet.impl.util.Util.checkSerializable(
    at com.hazelcast.jet.impl.pipeline.ComputeStageImplBase.attachMap(
    at com.hazelcast.certification.control.FraudDetectionRun.buildPipeline(
    at com.hazelcast.certification.control.FraudDetectionRun.start(
    at com.hazelcast.certification.control.FraudDetectionRun.main(
Caused by: com.hazelcast.client.impl.proxy.ClientMapProxy
    at java.base/
    at java.base/
    at java.base/
    at java.base/
    at java.base/
    at java.base/
    at java.base/
    at java.base/
    at java.base/
    at java.base/
    at java.base/
    at java.base/
    at com.hazelcast.jet.impl.util.Util.checkSerializable(
    ... 5 more

it seems the problem it's here

StreamStage<Transaction> appliyingMerchantRules = -> applyMerchantRules(index))

for some reason, ¿Can you tell me what am i doing wrong?


  • Your applyMerchantRules method is non-static, therefore the lambda captures the enclosing this instance. Seems that you've made FraudDetectionRun serializable in an attempt to fix this, but you should rather make the applyMerchantRules static.