Search code examples
scalaspark-csv

About how to create a custom org.apache.spark.sql.types.StructType schema object starting from a json file programmatically


i have to create a custom org.apache.spark.sql.types.StructType schema object with the info from a json file, the json file can be anything, so i have parametriced it within a property file.

This is how it looks the property file:

//ruta al esquema del fichero output (por defecto se infiere el esquema del Parquet destino). Si existe, el esquema será en formato JSON, aplicable a DataFrame (ver StructType.fromJson)
schema.parquet=/Users/XXXX/Desktop/generated_schema.json
writing.mode=overwrite
separator=;
header=false

The file generated_schema.json looks like:

{"type" : "struct","fields" : [ {"name" : "codigo","type" : "string","nullable" : true}, {"name":"otro", "type":"string", "nullable":true}, {"name":"vacio", "type":"string", "nullable":true},{"name":"final","type":"string","nullable":true} ]}

So, this is how i thought that i can solve it:

val path: Path = new Path(mra_schema_parquet)
val fileSystem = path.getFileSystem(sc.hadoopConfiguration)
val inputStream: FSDataInputStream = fileSystem.open(path)
val schema_json = Stream.cons(inputStream.readLine(), Stream.continually( inputStream.readLine))

System.out.println("schema_json looks like "  + schema_json.head)

val mySchemaStructType :DataType = DataType.fromJson(schema_json.head)

/*
After this line, mySchemaStructType have four StructFields objects inside it, the same than appears at schema_json
*/
logger.info(mySchemaStructType)

val myStructType = new StructType()
myStructType.add("mySchemaStructType",mySchemaStructType)

/*

After this line, myStructType have zero StructFields! here must be the bug, myStructType should have the four StructFields that represents the loaded schema json! this must be the error! but how can i construct the necessary StructType object?

*/

myDF = loadCSV(sqlContext, path_input_csv,separator,myStructType,header)
System.out.println("myDF.schema.json looks like " + myDF.schema.json)
inputStream.close()

df.write
  .format("com.databricks.spark.csv")
  .option("header", header)
  .option("delimiter",delimiter)
  .option("nullValue","")
  .option("treatEmptyValuesAsNulls","true")
  .mode(saveMode)
  .parquet(pathParquet)

When the code runs the last line, .parquet(pathParquet), the exception happens:

**parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: message root {
}**

The output of this code is like this:

16/11/11 13:57:04 INFO AnotherCSVtoParquet$: The job started using this propertie file: /Users/aisidoro/Desktop/mra-csv-converter/parametrizacion.properties
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: path_input_csv is /Users/aisidoro/Desktop/mra-csv-converter/cds_glcs.csv
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: path_output_parquet  is /Users/aisidoro/Desktop/output900000
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: mra_schema_parquet is /Users/aisidoro/Desktop/mra-csv-converter/generated_schema.json
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: writting_mode is overwrite
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: separator is ;
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: header is false
16/11/11 13:57:05 INFO AnotherCSVtoParquet$: ATTENTION! aplying mra_schema_parquet  /Users/aisidoro/Desktop/mra-csv-converter/generated_schema.json
schema_json looks like {"type" : "struct","fields" : [ {"name" : "codigo","type" : "string","nullable" : true}, {"name":"otro", "type":"string", "nullable":true}, {"name":"vacio", "type":"string", "nullable":true},{"name":"final","type":"string","nullable":true} ]}
16/11/11 13:57:12 INFO AnotherCSVtoParquet$: StructType(StructField(codigo,StringType,true), StructField(otro,StringType,true), StructField(vacio,StringType,true), StructField(final,StringType,true))
 16/11/11 13:57:13 INFO AnotherCSVtoParquet$: loadCSV. header is false, inferSchema is false pathCSV is /Users/aisidoro/Desktop/mra-csv-converter/cds_glcs.csv separator is ;
 myDF.schema.json looks like {"type":"struct","fields":[]}

It should be that schema_json object and myDF.schema.json object should have the same content, shouldn't ? but it did not happen. I think this must launch the error.

Finally the job crushes with this exception:

**parquet.schema.InvalidSchemaException: Cannot write a schema with an empty group: message root {
}**

The fact is if i do not provide any json schema file, the job performs fine, but with this schema...

Can anybody help me? I just want to create some parquet files starting from a csv file and json schema file.

Thank you.

The dependencies are:

    <spark.version>1.5.0-cdh5.5.2</spark.version>
    <databricks.version>1.5.0</databricks.version>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>${spark.version}</version>
        <scope>compile</scope>
    </dependency>
    <dependency>
        <groupId>com.databricks</groupId>
        <artifactId>spark-csv_2.10</artifactId>
        <version>${databricks.version}</version>
    </dependency>

UPDATE

I can see that there is a open issue,

https://github.com/databricks/spark-csv/issues/61


Solution

  • Finally i found the problem.

    The problem was on these lines:

    val myStructType = new StructType()
    myStructType.add("mySchemaStructType",mySchemaStructType)
    

    i have to use this line:

    val mySchemaStructType = DataType.fromJson(schema_json.head).asInstanceOf[StructType]
    

    I have to cast StructType from DataType in order to get things working.