Search code examples
javaapache-sparkapache-spark-datasetapache-spark-encoders

How to join two spark dataset to one with java objects?


I have a little problem joining two datasets in spark, I have this:

SparkConf conf = new SparkConf()
    .setAppName("MyFunnyApp")
    .setMaster("local[*]");

SparkSession spark = SparkSession
    .builder()
    .config(conf)
    .config("spark.debug.maxToStringFields", 150)
    .getOrCreate();
//...
//Do stuff
//...
Encoder<MyOwnObject1> encoderObject1 = Encoders.bean(MyOwnObject1.class);
Encoder<MyOwnObject2> encoderObject2 = Encoders.bean(MyOwnObject2.class);

Dataset<MyOwnObject1> object1DS = spark.read()
    .option("header","true")
    .option("delimiter",";")
    .option("inferSchema","true")
    .csv(pathToFile1)
    .as(encoderObject1);

Dataset<MyOwnObject2> object2DS = spark.read()
    .option("header","true")
    .option("delimiter",";")
    .option("inferSchema","true")
    .csv(pathToFile2)
    .as(encoderObject2);

I can print the schema and show it correctly.

//Here start the problem
Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = 
    object1DS.join(object2DS, object1DS.col("column01")
    .equalTo(object2DS.col("column01")))
    .as(Encoders.tuple(MyOwnObject1,MyOwnObject2));

Last line can't make join and get me this error:

Exception in thread "main" org.apache.spark.sql.AnalysisException: Try to map struct<"LIST WITH ALL VARS FROM TWO OBJECT"> to Tuple2, but failed as the number of fields does not line up.;

That's true, because Tuple2 (object2) doesn't have all vars...

Then I had tried this:

 Dataset<Tuple2<MyOwnObject1, MyOwnObject2>> joinObjectDS = object1DS
    .joinWith(object2DS, object1DS
        .col("column01")
        .equalTo(object2DS.col("column01")));

And works fine! But, I need a new Dataset without tuple, I have an object3, that have some vars from object1 and object2, then I have this problem:

Encoder<MyOwnObject3> encoderObject3 = Encoders.bean(MyOwnObject3.class);
Dataset<MyOwnObject3> object3DS = joinObjectDS.map(tupleObject1Object2 -> {
    MyOwnObject1 myOwnObject1 = tupleObject1Object2._1();
    MyOwnObject2 myOwnObject2 = tupleObject1Object2._2();
    MyOwnObject3 myOwnObject3 = new MyOwnObject3(); //Sets all vars with start values
    //...
    //Sets data from object 1 and 2 to 3.
    //...
    return myOwnObject3;
}, encoderObject3);

Fails!... here is the error:

17/05/10 12:17:43 ERROR CodeGenerator: failed to compile: org.codehaus.commons.compiler.CompileException: File 'generated.java', Line 593, Column 72: A method named "toString" is not declared in any enclosing class nor any supertype, nor through a static import

and over thousands error lines...

What can I do? I had tried:

  • Make my object only with String, int (or Integer) and double (or Double) (no more)
  • use differents encoders like kryo or javaSerialization
  • use JavaRDD (works! but very slowly) and use Dataframes with Rows (works, but i need to change many objects)
  • All my java objects are serializable
  • use sparks 2.1.0 and 2.1.1, now I have 2.1.1 on my pom.xml

I want to use Datasets, to use the speed from Dataframes and object sintax from JavaRDD...

Help?

Thanks


Solution

  • Finally I found a solution,

    I had a problem with the option inferSchema when my code was creating a Dataset. I have a String column that the option inferSchema return me an Integer column because all values are "numeric", but i need use them as String (like "0001", "0002"...) I need to do a schema, but I have many vars, then I write this with all my classes:

    List<StructField> fieldsObject1 = new ArrayList<>();
    for (Field field : MyOwnObject1.class.getDeclaredFields()) {
        fieldsObject1.add(DataTypes.createStructField(
            field.getName(),
            CatalystSqlParser.parseDataType(field.getType().getSimpleName()),
            true)
        );
    }
    StructType schemaObject1 = DataTypes.createStructType(fieldsObject1);
    
    Dataset<MyOwnObject1> object1DS = spark.read()
        .option("header","true")
        .option("delimiter",";")
        .schema(schemaObject1)
        .csv(pathToFile1)
        .as(encoderObject1);
    

    Works fine.

    The "best" solution would be this:

      Dataset<MyOwnObject1> object1DS = spark.read()
        .option("header","true")
        .option("delimiter",";")
        .schema(encoderObject1.schema())
        .csv(pathToFile1)
        .as(encoderObject1);
    

    but encoderObject1.schema() returns me a Schema with vars in alphabetical order, not in original order, then this option fails when I read a csv. Maybe Encoders should return a schema with vars in original order and not in alphabetical order