Search code examples
scalaapache-sparkapache-spark-sqlspark-streaming

Check column datatype and execute SQL only on Integer and Decimal in Spark SQL


I'm trying to check datatype of a column from the input Parquet file, if the datatype is Integer or Decimal then run Spark SQL.

//get Array of structfields 
 val datatypes = parquetRDD_subset.schema.fields

//Check datatype of column
 for (val_datatype <- datatypes)  if (val_datatype.dataType.typeName == "integer" || val_datatype.dataType.typeName.contains("decimal"))  
{
 //get the field name
val x = parquetRDD_subset.schema.fieldNames

 val dfs = x.map(field => spark.sql(s"select 'DataProfilerStats' as Table_Name,(SELECT 100 * approx_count_distinct($field)/count(1) from parquetDFTable) as Percentage_Unique_Value from parquetDFTable"))

 }

The issue is, although, datatype validation is successful, while inside the for loop after getting field names, it is not actually restricting the columns just to integer or decimals, the query is being performed on all the columns types even strings as well. How do we get the fields which only are Decimal or Integer. How do we address this.


Solution

  • This is how you can filter the columns with integer and double type

    // fiter the columns 
    val columns = df.schema.fields.filter(x => x.dataType == IntegerType || x.dataType == DoubleType)
    
    //use these filtered with select 
    df.select(columns.map(x => col(x.name)): _*)
    

    I hope this helps!