Search code examples
javahazelcast-jet

Serialization in the hazelcast jet


I'm getting the following error when I tried to processing the data from hazelcast jet.

Caused by: java.lang.IllegalArgumentException: Invalid lambda deserialization
    at com.example.LearnJet.joins.LeftJoins.$deserializeLambda$(LeftJoins.java:1)
    ... 59 more

Here is the code:- AddToCart1 instance

public class AddToCart1 implements Serializable {
    private int number;
    private String cart;

    public AddToCart1() {
        super();
        // TODO Auto-generated constructor stub
    }

    public int getNumber() {
        return number;
    }

    public AddToCart1(int number, String cart) {
        super();
        this.number = number;
        this.cart = cart;
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + ((cart == null) ? 0 : cart.hashCode());
        result = prime * result + number;
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        AddToCart1 other = (AddToCart1) obj;
        if (cart == null) {
            if (other.cart != null)
                return false;
        } else if (!cart.equals(other.cart))
            return false;
        if (number != other.number)
            return false;
        return true;
    }

    public void setNumber(int number) {
        this.number = number;
    }

    public String getCart() {
        return cart;
    }

    public void setCart(String cart) {
        this.cart = cart;
    }

}

PageVisit1 instance

public class PageVisit1 implements Serializable {

    /**
     * 
     */
    private int number;
    private String pageName;

    public PageVisit1() {
    }

    @Override
    public int hashCode() {
        final int prime = 31;
        int result = 1;
        result = prime * result + number;
        result = prime * result + ((pageName == null) ? 0 : pageName.hashCode());
        return result;
    }

    @Override
    public boolean equals(Object obj) {
        if (this == obj)
            return true;
        if (obj == null)
            return false;
        if (getClass() != obj.getClass())
            return false;
        PageVisit1 other = (PageVisit1) obj;
        if (number != other.number)
            return false;
        if (pageName == null) {
            if (other.pageName != null)
                return false;
        } else if (!pageName.equals(other.pageName))
            return false;
        return true;
    }

    public PageVisit1(int number, String pageName) {
        super();
        this.number = number;
        this.pageName = pageName;
    }

    /**
     * @return the number
     */
    public int getNumber() {
        return number;
    }

    /**
     * @param number the number to set
     */
    public void setNumber(int number) {
        this.number = number;
    }

    /**
     * @return the pageName
     */
    public String getPageName() {
        return pageName;
    }

    /**
     * @param pageName the pageName to set
     */
    public void setPageName(String pageName) {
        this.pageName = pageName;
    }

}

Here is the main class


public class LeftJoins {

    public static void main(String[] args) throws InvocationTargetException {

        JetInstance jet = Jet.bootstrappedInstance();

        IList<AddToCart1> addToCartList = jet.getList("cart");
        IList<PageVisit1> paymentList = jet.getList("page");

        // AddToCartData
        AddToCart1 ad1 = new AddToCart1();
        ad1.setNumber(1);
        ad1.setCart("lulu bazar");

        AddToCart1 ad2 = new AddToCart1();
        ad2.setNumber(2);
        ad2.setCart("krishna bazar");

        AddToCart1 ad3 = new AddToCart1();
        ad3.setNumber(3);
        ad3.setCart("ram bazar");

        addToCartList.add(ad1);
        addToCartList.add(ad2);
        addToCartList.add(ad3);

        // Page Data
        PageVisit1 pg1 = new PageVisit1();
        pg1.setNumber(1);
        pg1.setPageName("k login");

        PageVisit1 pg2 = new PageVisit1();
        pg2.setNumber(2);
        pg2.setPageName("plogin");

        paymentList.add(pg1);
        paymentList.add(pg2);

        // creating a piple-line here
        Pipeline p = Pipeline.create();
        BatchStageWithKey<AddToCart1, Object> cart = p.readFrom(Sources.<AddToCart1>list("cart"))
                .groupingKey(cart1 -> cart1.getNumber());
        BatchStageWithKey<PageVisit1, Object> page = p.readFrom(Sources.<PageVisit1>list("page"))
                .groupingKey(page1 -> page1.getNumber());

        BatchStage<Tuple2<List<PageVisit1>, List<AddToCart1>>> joinedLists1 = page.aggregate2(toList(), cart, toList())
                .map(Entry::getValue);

        BatchStage<Tuple2<List<PageVisit1>, List<AddToCart1>>> m = joinedLists1.filter(pair -> !pair.f0().isEmpty());

        m.writeTo(Sinks.logger());

        jet.newJob(p).join();
        // joinedLists.filter(pair -> !pair.isEmpty());
    }


Solution

  • The code is obviously incomplete, because there's no page variable so that page.aggregate2(...) shouldn't compile.

    For this reason, I'm unable to point you to the exact line where the issue occurs. However, the error message tells you're using a "standard" lambda from the JDK that isn't Serializable, whereas you should use the ones from Jet, which are.

    Please check this package.

    EDIT:

    I've created a dedicated GitHub project with the above code.

    Everything works as expected. The 2 tuples are displayed correctly in the log:

    09:33:59.974 [ INFO] [c.h.j.i.c.WriteLoggerP] [05d8-7d0a-e3c0-0001/loggerSink#0] ([ch.frankel.so.PageVisit1@c544b8f8], [ch.frankel.so.AddToCart1@41c722ed])
    09:33:59.974 [ INFO] [c.h.j.i.c.WriteLoggerP] [05d8-7d0a-e3c0-0001/loggerSink#0] ([ch.frankel.so.PageVisit1@58fbcb54], [ch.frankel.so.AddToCart1@c666a2c4])