Search code examples
javaapache-sparkapache-spark-2.0

How to create encoder for custom Java objects?


I am using following class to create bean from Spark Encoders

Class OuterClass implements Serializable {
    int id;
    ArrayList<InnerClass> listofInner;
    
    public int getId() {
        return id;
    }
    
    public void setId (int num) {
        this.id = num;
    }

    public ArrayList<InnerClass> getListofInner() {
        return listofInner;
    }
    
    public void setListofInner(ArrayList<InnerClass> list) {
        this.listofInner = list;
    }
}

public static class InnerClass implements Serializable {
    String streetno;
    
    public void setStreetno(String streetno) {
        this.streetno= streetno;
    }

    public String getStreetno() {
        return streetno;
    }
}

Encoder<OuterClass> outerClassEncoder = Encoders.bean(OuterClass.class);
Dataset<OuterClass> ds = spark.createDataset(Collections.singeltonList(outerclassList), outerClassEncoder)

And I am getting the following error

Exception in thread "main" java.lang.UnsupportedOperationException: Cannot infer type for class OuterClass$InnerClass because it is not bean-compliant

How can I implement this type of use case for Spark in Java? This worked fine if I remove the inner class. But I need to have an inner class for my use case.


Solution

  • Your JavaBean class should have a public no-argument constructor, getter and setters and it should implement Serializable interface. Spark SQL works on valid JavaBean class.

    EDIT : Adding working sample with inner class

    OuterInnerDF.java

    package com.abaghel.examples;
    
    import java.util.ArrayList;
    import java.util.Collections;
    import org.apache.spark.sql.Dataset;
    import org.apache.spark.sql.Encoder;
    import org.apache.spark.sql.Encoders;
    import org.apache.spark.sql.SparkSession;
    import com.abaghel.examples.OuterClass.InnerClass;
    
    public class OuterInnerDF {
        public static void main(String[] args) {
            SparkSession spark = SparkSession
                .builder()
                .appName("OuterInnerDF")
                .config("spark.sql.warehouse.dir", "/file:C:/temp")
                .master("local[2]")
                .getOrCreate();
    
            System.out.println("====> Create DataFrame");
            //Outer
            OuterClass us = new OuterClass();
            us.setId(111);      
            //Inner
            OuterClass.InnerClass ic = new OuterClass.InnerClass();
            ic.setStreetno("My Street");
            //list
            ArrayList<InnerClass> ar = new ArrayList<InnerClass>();
            ar.add(ic);      
            us.setListofInner(ar);   
            //DF
            Encoder<OuterClass> outerClassEncoder = Encoders.bean(OuterClass.class);         
            Dataset<OuterClass> ds = spark.createDataset(Collections.singletonList(us), outerClassEncoder);
            ds.show();
        }
    }
    

    OuterClass.java

    package com.abaghel.examples;
    
    import java.io.Serializable;
    import java.util.ArrayList;
    
    public class OuterClass implements Serializable {
        int id;
        ArrayList<InnerClass> listofInner;
    
        public int getId() {
            return id;
        }
    
        public void setId(int num) {
            this.id = num;
        }
    
        public ArrayList<InnerClass> getListofInner() {
            return listofInner;
        }
    
        public void setListofInner(ArrayList<InnerClass> list) {
            this.listofInner = list;
        }
    
        public static class InnerClass implements Serializable {
            String streetno;
    
            public void setStreetno(String streetno) {
                this.streetno = streetno;
            }
    
            public String getStreetno() {
                return streetno;
            }
        }
    }
    

    Console Output

    ====> Create DataFrame
    16/08/28 18:02:55 INFO CodeGenerator: Code generated in 32.516369 ms
    +---+-------------+
    | id|  listofInner|
    +---+-------------+
    |111|[[My Street]]|
    +---+-------------+