Search code examples
scalaapache-sparkapache-spark-sqlspark-csv

how to change header of a data frame with another data frame header?


I have a data set which looks like this

LineItem.organizationId|^|LineItem.lineItemId|^|StatementTypeCode|^|LineItemName|^|LocalLanguageLabel|^|FinancialConceptLocal|^|FinancialConceptGlobal|^|IsDimensional|^|InstrumentId|^|LineItemSequence|^|PhysicalMeasureId|^|FinancialConceptCodeGlobalSecondary|^|IsRangeAllowed|^|IsSegmentedByOrigin|^|SegmentGroupDescription|^|SegmentChildDescription|^|SegmentChildLocalLanguageLabel|^|LocalLanguageLabel.languageId|^|LineItemName.languageId|^|SegmentChildDescription.languageId|^|SegmentChildLocalLanguageLabel.languageId|^|SegmentGroupDescription.languageId|^|SegmentMultipleFundbDescription|^|SegmentMultipleFundbDescription.languageId|^|IsCredit|^|FinancialConceptLocalId|^|FinancialConceptGlobalId|^|FinancialConceptCodeGlobalSecondaryId|^|FFAction|!|
Japan|^|1507101869432|^|4295876606|^|1|^|BAL|^|Cash And Deposits|^|null|^|null|^|ACAE|^|false|^|null|^|null|^|null|^|null|^|false|^|null|^|null|^|null|^|null|^|505126|^|505074|^|null|^|null|^|null|^|null|^|null|^|null|^|null|^|3018759|^|null|^|I|!|

And this is how i load data with auto discover schema

val df1With_ = df.toDF(df.columns.map(_.replace(".", "_")): _*)
val column_to_keep = df1With_.columns.filter(v => (!v.contains("^") && !v.contains("!") && !v.contains("_c"))).toSeq
val df1result = df1With_.select(column_to_keep.head, column_to_keep.tail: _*)

Now i have another data frame on which i do join operation and finally i create a data frame which writes output to csv file .

Final data frame looks like this

val dfMainOutputFinal = dfMainOutput.select($"DataPartition", $"StatementTypeCode",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").map(c => col(c)): _*).as("concatenated"))

val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "null", ""))

dfMainOutputFinalWithoutNull.write.partitionBy("DataPartition","StatementTypeCode")
  .format("csv")
  .option("nullValue", "")
  .option("header","true")
  .option("codec", "gzip")
  .save("output")

Now in my output file i see my header as only concatenated which is expected .

Now my question is is there anyway to change header of my final output as header of df1result data frame


Solution

  • I believe the simplest way to solve this would be to rename the concatenated column. As the column names already exists in the column_to_keep variable, you can simply do:

    val header = column_to_keep.mkString("|^|")
    val dfMainOutputFinalWithoutNull = dfMainOutputFinal
      .withColumn("concatenated", regexp_replace(col("concatenated"), "null", ""))
      .withColumnRenamed("concatenated", header)
    

    This will result is an extremely long column name, hence, I wouldn't advice it if it was for something else than saving to a csv.