Search code examples
scalaapache-sparkapache-spark-sqlazure-databricks

How to skip first and last line from a dat file and make it to dataframe using scala in databricks


H|*|D|*|PA|*|BJ|*|S|*|2019.05.27 08:54:24|##|
H|*|AP_ATTR_ID|*|AP_ID|*|OPER_ID|*|ATTR_ID|*|ATTR_GROUP|*|LST_UPD_USR|*|LST_UPD_TSTMP|##|
779045|*|Sar|*|SUPERVISOR HIERARCHY|*|Supervisor|*|2|*|128|*|2019.05.14 16:48:16|##|
779048|*|KK|*|SUPERVISOR HIERARCHY|*|Supervisor|*|2|*|116|*|2019.05.14 16:59:02|##|
779054|*|Nisha - A|*|EXACT|*|CustomColumnRow120|*|2|*|1165|*|2019.05.15 12:11:48|##|
T|*||*|2019.05.27 08:54:28|##|

file name is PA.dat.

I need to skip first line and also last line of the file.second line of the file is column name. Now I need to make a dataframe with columnanme and skipping those two line using scala.

N.B - need to skip that 'H' from second line also as it is not part of column name.

Please help me on this.


Solution

  • Something like that. I don't know if sql.functions could split array into columns, so i did it using rdd.

    import java.util.regex.Pattern
    import org.apache.spark.sql.RowFactory
    import org.apache.spark.sql.types.{StringType, StructField, StructType}
    import org.apache.spark.sql.functions._ 
    
    val data = spark.read
      .text("data/PA.dat")
      .toDF("val")
      .withColumn("id", monotonically_increasing_id())
    
    val count = data.count()
    val header = data.where('id === 1).collect().map(s => s.getString(0)).apply(0)
    val columns = header
      .replace("H|*|", "")
      .replace("|##|", "")
      .replace("|*|", ",")
      .split(",")
    val columnDelimiter = Pattern.quote("|*|")
    val correctData = data.where('id > 1 && 'id < count - 1)
      .select(regexp_replace('val, columnDelimiter, ",").as("val"))
    val splitIntoCols = correctData.rdd.map(s=>{
      val arr = s.getString(0).split(",")
      RowFactory.create(arr:_*)
    })
    val struct = StructType(columns.map(s=>StructField(s, StringType, true)))
    val finalDF = spark.createDataFrame(splitIntoCols,struct)
    
    finalDF.show()
    
    
    +----------+---------+--------------------+------------------+----------+-----------+--------------------+
    |AP_ATTR_ID|    AP_ID|             OPER_ID|           ATTR_ID|ATTR_GROUP|LST_UPD_USR|       LST_UPD_TSTMP|
    +----------+---------+--------------------+------------------+----------+-----------+--------------------+
    |    779045|      Sar|SUPERVISOR HIERARCHY|        Supervisor|         2|        128|2019.05.14 16:48:...|
    |    779048|       KK|SUPERVISOR HIERARCHY|        Supervisor|         2|        116|2019.05.14 16:59:...|
    |    779054|Nisha - A|               EXACT|CustomColumnRow120|         2|       1165|2019.05.15 12:11:...|
    +----------+---------+--------------------+------------------+----------+-----------+--------------------+