Search code examples
scalaapache-sparkapache-spark-sql

Union of two data frames changes column order in Spark


My data frame 1:

OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber|^|FFAction|!|
4295876332|^|41|^|40|^|1|^|I|!|
4295876332|^|41|^|110|^|2|^|I|!|
4295876332|^|41|^|111|^|2|^|I|!|
4295876332|^|138|^|139|^|1|^|I|!|
4295876332|^|138|^|193|^|2|^|I|!|
4295877204|^|38|^|37|^|1|^|I|!|
4295877204|^|38|^|103|^|2|^|I|!|
4295877204|^|38|^|104|^|2|^|I|!|
4295877204|^|131|^|132|^|1|^|I|!|
4295877204|^|131|^|178|^|2|^|I|!|
4295877234|^|7|^|100|^|1|^|I|!|
4295877234|^|7|^|137|^|2|^|I|!|
4295877234|^|7|^|138|^|2|^|I|!|
4295877234|^|158|^|188|^|1|^|I|!|
4295877234|^|158|^|210|^|2|^|I|!|
4295877320|^|41|^|40|^|1|^|I|!|
4295877320|^|41|^|107|^|2|^|I|!|
4295877320|^|41|^|108|^|2|^|I|!|
4295877320|^|135|^|136|^|1|^|I|!|
4295877320|^|135|^|190|^|2|^|I|!|
4295877413|^|41|^|40|^|1|^|I|!|
4295877413|^|41|^|108|^|2|^|I|!|
4295877413|^|41|^|109|^|2|^|I|!|
4295877413|^|138|^|139|^|1|^|I|!|
4295877413|^|138|^|190|^|2|^|I|!|
4295877734|^|41|^|40|^|1|^|I|!|
4295877734|^|41|^|121|^|2|^|I|!|
4295877734|^|41|^|122|^|2|^|I|!|
4295877734|^|136|^|137|^|1|^|I|!|
4295877734|^|136|^|188|^|2|^|I|!|
4295878126|^|41|^|40|^|1|^|I|!|
4295878126|^|41|^|106|^|2|^|I|!|
4295878126|^|41|^|107|^|2|^|I|!|
4295878126|^|134|^|135|^|1|^|I|!|
4295878126|^|134|^|181|^|2|^|I|!|
4295880491|^|6|^|172|^|2|^|I|!|
4295880491|^|6|^|173|^|2|^|I|!|
4295880491|^|171|^|174|^|2|^|I|!|
4295876139|^|41|^|40|^|1|^|I|!|
4295876139|^|41|^|122|^|2|^|I|!|
4295876139|^|41|^|123|^|2|^|I|!|
4295876139|^|134|^|135|^|1|^|I|!|
4295876139|^|134|^|188|^|2|^|I|!|
4295876509|^|41|^|40|^|1|^|I|!|
4295876509|^|41|^|118|^|2|^|I|!|
4295876509|^|41|^|119|^|2|^|I|!|
4295876509|^|134|^|135|^|1|^|I|!|
4295876509|^|134|^|185|^|2|^|I|!|
4295876547|^|3|^|100|^|1|^|I|!|
4295876547|^|3|^|130|^|2|^|I|!|
4295876547|^|3|^|131|^|2|^|I|!|
4295876547|^|153|^|185|^|1|^|I|!|
4295876547|^|153|^|202|^|2|^|I|!|
4295876646|^|5|^|104|^|1|^|I|!|
4295876646|^|5|^|150|^|2|^|I|!|
4295876646|^|5|^|151|^|2|^|I|!|
4295876646|^|162|^|195|^|1|^|I|!|
4295876646|^|162|^|217|^|2|^|I|!|
4295876738|^|41|^|40|^|1|^|I|!|
4295876738|^|41|^|106|^|2|^|I|!|
4295876738|^|41|^|107|^|2|^|I|!|
4295876738|^|134|^|135|^|1|^|I|!|
4295876738|^|134|^|187|^|2|^|I|!|
4295877225|^|41|^|40|^|1|^|I|!|
4295877225|^|41|^|122|^|2|^|I|!|
4295877225|^|41|^|123|^|2|^|I|!|
4295877225|^|134|^|135|^|1|^|I|!|
4295877225|^|134|^|188|^|2|^|I|!|
4295877766|^|41|^|40|^|1|^|I|!|
4295877766|^|41|^|106|^|2|^|I|!|
4295877766|^|41|^|107|^|2|^|I|!|
4295877766|^|134|^|135|^|1|^|I|!|
4295877766|^|134|^|186|^|2|^|I|!|
4295877812|^|41|^|40|^|1|^|I|!|
4295877812|^|41|^|112|^|2|^|I|!|
4295877812|^|41|^|113|^|2|^|I|!|
4295877812|^|134|^|135|^|1|^|I|!|
4295877812|^|134|^|186|^|2|^|I|!|
4295877871|^|41|^|40|^|1|^|I|!|
4295877871|^|41|^|124|^|2|^|I|!|
4295877871|^|41|^|125|^|2|^|I|!|
4295877871|^|137|^|138|^|1|^|I|!|
4295877871|^|137|^|190|^|2|^|I|!|
4295877923|^|41|^|40|^|1|^|I|!|
4295877923|^|41|^|122|^|2|^|I|!|
4295877923|^|41|^|123|^|2|^|I|!|
4295877923|^|134|^|135|^|1|^|I|!|
4295877923|^|134|^|188|^|2|^|I|!|
4295877985|^|41|^|40|^|1|^|I|!|
4295877985|^|41|^|113|^|2|^|I|!|
4295877985|^|41|^|114|^|2|^|I|!|
4295877985|^|134|^|135|^|1|^|I|!|
4295877985|^|134|^|188|^|2|^|I|!|
4295878608|^|41|^|40|^|1|^|I|!|
4295878608|^|41|^|105|^|2|^|I|!|
4295878608|^|41|^|106|^|2|^|I|!|
4295878608|^|130|^|131|^|1|^|I|!|
4295878608|^|130|^|182|^|2|^|I|!|
4295878863|^|41|^|40|^|1|^|I|!|
4295878863|^|41|^|121|^|2|^|I|!|
4295878863|^|41|^|122|^|2|^|I|!|
4295878863|^|134|^|135|^|1|^|I|!|
4295878863|^|134|^|187|^|2|^|I|!|
4295880574|^|166|^|167|^|2|^|I|!|
4295880574|^|166|^|168|^|2|^|I|!|
4295880574|^|273|^|274|^|2|^|I|!|
4295876308|^|41|^|40|^|1|^|I|!|
4295876308|^|41|^|103|^|2|^|I|!|
4295876308|^|41|^|104|^|2|^|I|!|
4295876308|^|130|^|131|^|1|^|I|!|
4295876308|^|130|^|177|^|2|^|I|!|

My data frame 2:

DataPartition|^|PartitionYear|^|TimeStamp|^|OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber|^|FFAction|!|
SelfSourcedPublic|^|2016|^|1515129638858|^|4295902451|^|109|^|110|^|1|^|O|!|
SelfSourcedPublic|^|2016|^|1515129638859|^|4295902451|^|111|^|112|^|1|^|O|!|
SelfSourcedPublic|^|2016|^|1515129638860|^|4295902451|^|109|^|113|^|2|^|O|!|
SelfSourcedPublic|^|2016|^|1515129638861|^|4295902451|^|109|^|114|^|2|^|O|!|
SelfSourcedPublic|^|2016|^|1515129638862|^|4295902451|^|111|^|115|^|2|^|O|!|
SelfSourcedPublic|^|2016|^|1515129638863|^|4295902451|^|109|^|119|^|4|^|O|!|
SelfSourcedPublic|^|2016|^|1515129638864|^|4295902451|^|109|^|120|^|4|^|O|!|
SelfSourcedPublic|^|2016|^|1515129638865|^|4295902451|^|111|^|121|^|4|^|O|!|
SelfSourcedPublic|^|2017|^|1515129638866|^|4295902451|^|122|^|126|^|2|^|O|!|
SelfSourcedPublic|^|2017|^|1515129638867|^|4295902451|^|122|^|127|^|2|^|O|!|
SelfSourcedPublic|^|2017|^|1515129639565|^|4295859031|^|126|^|127|^|1|^|I|!|
SelfSourcedPublic|^|2017|^|1515129639566|^|4295859031|^|128|^|129|^|1|^|I|!|
SelfSourcedPublic|^|2017|^|1515129639688|^|4295859031|^|null|^|126|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129639689|^|4295859031|^|null|^|127|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129639690|^|4295859031|^|null|^|128|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129639691|^|4295859031|^|null|^|129|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129639713|^|4295906830|^|null|^|420|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129639714|^|4295906830|^|null|^|421|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129639715|^|4295906830|^|null|^|422|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129639741|^|4295906830|^|null|^|420|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129639742|^|4295906830|^|null|^|421|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129639743|^|4295906830|^|null|^|422|^|null|^|D|!|
SelfSourcedPrivate|^|2014|^|1515129639770|^|4298009288|^|171|^|206|^|2|^|O|!|
SelfSourcedPrivate|^|2014|^|1515129639771|^|4298009288|^|143|^|203|^|2|^|O|!|
SelfSourcedPrivate|^|2005|^|1515129639809|^|4298009288|^|167|^|168|^|4|^|O|!|
SelfSourcedPrivate|^|2006|^|1515129639810|^|4298009288|^|163|^|195|^|2|^|O|!|
SelfSourcedPrivate|^|2005|^|1515129639811|^|4298009288|^|163|^|196|^|1|^|O|!|
SelfSourcedPrivate|^|2005|^|1515129639812|^|4298009288|^|167|^|197|^|3|^|O|!|
SelfSourcedPrivate|^|2005|^|1515129639813|^|4298009288|^|167|^|198|^|2|^|O|!|
SelfSourcedPrivate|^|2005|^|1515129639814|^|4298009288|^|30|^|29|^|4|^|O|!|
SelfSourcedPrivate|^|2006|^|1515129639815|^|4298009288|^|22|^|73|^|2|^|O|!|
SelfSourcedPrivate|^|2005|^|1515129639816|^|4298009288|^|22|^|75|^|1|^|O|!|
SelfSourcedPrivate|^|2005|^|1515129639817|^|4298009288|^|30|^|76|^|3|^|O|!|
SelfSourcedPrivate|^|2005|^|1515129639818|^|4298009288|^|30|^|78|^|2|^|O|!|
SelfSourcedPrivate|^|2006|^|1515129640008|^|4298009288|^|163|^|164|^|4|^|O|!|
SelfSourcedPrivate|^|2007|^|1515129640009|^|4298009288|^|161|^|191|^|3|^|O|!|
SelfSourcedPrivate|^|2007|^|1515129640010|^|4298009288|^|161|^|192|^|2|^|O|!|
SelfSourcedPrivate|^|2006|^|1515129640011|^|4298009288|^|161|^|193|^|1|^|O|!|
SelfSourcedPrivate|^|2006|^|1515129640012|^|4298009288|^|163|^|194|^|3|^|O|!|
SelfSourcedPrivate|^|2006|^|1515129640013|^|4298009288|^|22|^|24|^|4|^|O|!|
SelfSourcedPrivate|^|2007|^|1515129640014|^|4298009288|^|19|^|66|^|3|^|O|!|
SelfSourcedPrivate|^|2007|^|1515129640015|^|4298009288|^|19|^|68|^|2|^|O|!|
SelfSourcedPrivate|^|2006|^|1515129640016|^|4298009288|^|19|^|70|^|1|^|O|!|
SelfSourcedPrivate|^|2006|^|1515129640017|^|4298009288|^|22|^|71|^|3|^|O|!|
SelfSourcedPrivate|^|2010|^|1515129640132|^|4298009288|^|155|^|183|^|2|^|O|!|
SelfSourcedPrivate|^|2010|^|1515129640133|^|4298009288|^|10|^|53|^|2|^|O|!|
SelfSourcedPublic|^|2017|^|1515129640204|^|4295904170|^|null|^|379|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129640205|^|4295904170|^|null|^|380|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129640206|^|4295904170|^|null|^|384|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129640313|^|4295904170|^|null|^|379|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129640314|^|4295904170|^|null|^|380|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129640315|^|4295904170|^|null|^|384|^|null|^|D|!|
SelfSourcedPublic|^|2017|^|1515129640528|^|4295904170|^|381|^|379|^|3|^|O|!|
SelfSourcedPublic|^|2017|^|1515129640529|^|4295904170|^|381|^|380|^|3|^|O|!|
SelfSourcedPublic|^|2017|^|1515129640530|^|4295904170|^|381|^|383|^|4|^|I|!|
SelfSourcedPublic|^|2017|^|1515129640531|^|4295904170|^|385|^|384|^|4|^|I|!|
SelfSourcedPublic|^|2017|^|1515129641126|^|4295904170|^|372|^|379|^|3|^|O|!|
SelfSourcedPublic|^|2017|^|1515129641127|^|4295904170|^|372|^|380|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641505|^|4295858941|^|24|^|25|^|4|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641506|^|4295858941|^|24|^|25|^|5|^|O|!|
SelfSourcedPublic|^|2003|^|1515129641507|^|4295858941|^|30|^|31|^|2|^|O|!|
SelfSourcedPublic|^|2003|^|1515129641508|^|4295858941|^|30|^|31|^|3|^|O|!|
SelfSourcedPublic|^|2003|^|1515129641509|^|4295858941|^|30|^|32|^|1|^|O|!|
SelfSourcedPublic|^|2003|^|1515129641510|^|4295858941|^|30|^|32|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641511|^|4295858941|^|24|^|33|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641512|^|4295858941|^|24|^|33|^|4|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641513|^|4295858941|^|24|^|34|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641514|^|4295858941|^|24|^|34|^|20|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641515|^|4295858941|^|1|^|2|^|4|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641516|^|4295858941|^|1|^|3|^|4|^|O|!|
SelfSourcedPublic|^|2001|^|1515129641517|^|4295858941|^|5|^|6|^|4|^|O|!|
SelfSourcedPublic|^|2001|^|1515129641518|^|4295858941|^|5|^|7|^|4|^|O|!|
SelfSourcedPublic|^|2003|^|1515129641519|^|4295858941|^|12|^|10|^|2|^|O|!|
SelfSourcedPublic|^|2003|^|1515129641520|^|4295858941|^|12|^|11|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641521|^|4295858941|^|1|^|13|^|1|^|O|!|
SelfSourcedPublic|^|2003|^|1515129641522|^|4295858941|^|12|^|14|^|1|^|O|!|
SelfSourcedPublic|^|2001|^|1515129641523|^|4295858941|^|5|^|15|^|3|^|O|!|
SelfSourcedPublic|^|2001|^|1515129641524|^|4295858941|^|5|^|16|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641525|^|4295858941|^|1|^|17|^|3|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641526|^|4295858941|^|1|^|18|^|3|^|O|!|
SelfSourcedPublic|^|2001|^|1515129641527|^|4295858941|^|5|^|19|^|1|^|O|!|
SelfSourcedPublic|^|2001|^|1515129641528|^|4295858941|^|5|^|20|^|2|^|O|!|
SelfSourcedPublic|^|2001|^|1515129641529|^|4295858941|^|5|^|21|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641530|^|4295858941|^|1|^|22|^|2|^|O|!|
SelfSourcedPublic|^|2002|^|1515129641531|^|4295858941|^|1|^|23|^|2|^|O|!|
SelfSourcedPublic|^|2016|^|1515129641532|^|4295858941|^|35|^|36|^|1|^|I|!|
SelfSourcedPublic|^|2016|^|1515129641603|^|4295858941|^|null|^|35|^|null|^|D|!|
SelfSourcedPublic|^|2016|^|1515129641604|^|4295858941|^|null|^|36|^|null|^|D|!|
SelfSourcedPublic|^|2016|^|1515129641605|^|4295858941|^|null|^|37|^|null|^|D|!|
SelfSourcedPrivate|^|2016|^|1515129641752|^|4298009288|^|232|^|242|^|4|^|O|!|
SelfSourcedPrivate|^|2016|^|1515129641753|^|4298009288|^|248|^|249|^|1|^|O|!|
SelfSourcedPrivate|^|2016|^|1515129641754|^|4298009288|^|248|^|249|^|1|^|O|!|
SelfSourcedPrivate|^|2016|^|1515129641755|^|4298009288|^|230|^|240|^|4|^|O|!|
SelfSourcedPrivate|^|2016|^|1515129641756|^|4298009288|^|243|^|247|^|1|^|O|!|
SelfSourcedPrivate|^|2017|^|1515129641757|^|4298009288|^|248|^|252|^|2|^|O|!|
SelfSourcedPrivate|^|2017|^|1515129641758|^|4298009288|^|248|^|255|^|3|^|O|!|
ThirdPartyPrivate|^|2016|^|1515129641866|^|4296803503|^|1|^|2|^|1|^|I|!|
SelfSourcedPublic|^|2016|^|1515129642192|^|4295907168|^|367|^|377|^|4|^|O|!|
SelfSourcedPublic|^|2016|^|1515129642193|^|4295907168|^|365|^|375|^|4|^|O|!|
SelfSourcedPublic|^|2016|^|1515129642194|^|4295907168|^|365|^|376|^|4|^|O|!|
Japan|^|2016|^|1515129642733|^|4295876606|^|272|^|278|^|3|^|O|!|
Japan|^|2016|^|1515129642734|^|4295876606|^|272|^|278|^|3|^|O|!|
Japan|^|2016|^|1515129642735|^|4295876606|^|270|^|276|^|2|^|O|!|
Japan|^|2016|^|1515129642736|^|4295876606|^|270|^|277|^|3|^|O|!|
Japan|^|2016|^|1515129642737|^|4295876606|^|270|^|279|^|3|^|O|!|
SelfSourcedPublic|^|2016|^|1515129657602|^|4296803503|^|1|^|2|^|1|^|O|!|

My full working code:

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
        import sqlContext.implicits._
    
        import org.apache.spark.{ SparkConf, SparkContext }
        import java.sql.{Date, Timestamp}
        import org.apache.spark.sql.Row
        import org.apache.spark.sql.types._

    import org.apache.spark.sql.functions.udf

import org.apache.spark.sql.functions.input_file_name
import org.apache.spark.sql.functions.regexp_extract

val get_cus_val = spark.udf.register("get_cus_val", (filePath: String) => filePath.split("\\.")(3))
val get_cus_YearPartition = spark.udf.register("get_cus_YearPartition", (filePath: String) => filePath.split("\\.")(4))

val rdd = sc.textFile("s3://trfsmallfffile/Interim2Annual/MAIN")
val header = rdd.filter(_.contains("OrganizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema = StructType(header.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data = sqlContext.createDataFrame(rdd.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema)

val schemaHeader = StructType(header.map(cols => StructField(cols.replace(".", "."), StringType)).toSeq)
val dataHeader = sqlContext.createDataFrame(rdd.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schemaHeader)

val df1resultFinal=data.withColumn("DataPartition", get_cus_val(input_file_name))
val df1resultFinalWithYear=df1resultFinal.withColumn("PartitionYear", get_cus_YearPartition(input_file_name))

//Loading Incremental 

val rdd1 = sc.textFile("s3://trfsmallfffile/Interim2Annual/INCR")
val header1 = rdd1.filter(_.contains("OrganizationId")).map(line => line.split("\\|\\^\\|")).first()
val schema1 = StructType(header1.map(cols => StructField(cols.replace(".", "_"), StringType)).toSeq)
val data1 = sqlContext.createDataFrame(rdd1.filter(!_.contains("OrganizationId")).map(line => Row.fromSeq(line.split("\\|\\^\\|").toSeq)), schema1)


 //------------------------------- filtering only the latest from incremental ------------------------------
 
    import org.apache.spark.sql.expressions._
    val windowSpec = Window.partitionBy("OrganizationId","AnnualPeriodId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)
    val latestForEachKey1 = data1.withColumn("rank", rank().over(windowSpec)).filter($"rank" === 1).drop("rank")

    val windowSpec2 = Window.partitionBy("OrganizationId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)
    val latestForEachKey = latestForEachKey1.withColumn("tobefiltered", first("FFAction|!|").over(windowSpec2))
      .filter($"tobefiltered" === "I|!|" || $"tobefiltered" === "O|!|" || ($"tobefiltered" === "D|!|" && $"FFAction|!|" === "D|!|"))
      .drop("tobefiltered", "TimeStamp")

//-----------------separating the incremental df for insert, deletion and overwrite----------------

    //---------------insert rows are selected -------------------------------
    //insert a row if I is detected and if O is found then first delete and then insert
    
    val insertdf = latestForEachKey.filter($"FFAction|!|" === "I|!|" || $"FFAction|!|" === "O|!|").select(df1resultFinalWithYear.schema.fieldNames.map(col):_*)

    //------------------deleted rows with primary key  "OrganizationId", "InterimPeriodId"------------------
    // delete rows from parent if both D or O is found in incremental
    val deletedf = latestForEachKey.filter($"FFAction|!|" === "D|!|" || $"FFAction|!|" === "O|!|").select($"OrganizationId", $"InterimPeriodId", lit("delete").as("Delete"))

    //join by two primary keys for deletion and delete from the parent dataframe
    val dfMainOutput = df1resultFinalWithYear.join(deletedf, Seq("OrganizationId", "InterimPeriodId"), "left").filter($"Delete".isNull).drop("Delete")

val dfToSave=dfMainOutput.union(insertdf).withColumn("FFAction|!|", when($"FFAction|!|" === "O|!|" || $"FFAction|!|" === "I|!|", lit("I|!|")))

val dfMainOutputFinal = dfToSave.na.fill("").select($"DataPartition", $"PartitionYear",concat_ws("|^|", dfMainOutput.schema.fieldNames.filter(_ != "DataPartition").filter(_ != "PartitionYear").map(c => col(c)): _*).as("concatenated"))

val headerColumn = dataHeader.columns.toSeq

val header = headerColumn.mkString("", "|^|", "|!|").dropRight(3)

val dfMainOutputFinalWithoutNull = dfMainOutputFinal.withColumn("concatenated", regexp_replace(col("concatenated"), "null", "")).withColumnRenamed("concatenated", header)

dfMainOutputFinalWithoutNull.repartition(1).write.partitionBy("DataPartition","PartitionYear")
  .format("csv")
  .option("nullValue", "")
  .option("delimiter", "\t")
  .option("quote", "\u0000")
  .option("header", "true")
  .option("codec", "gzip")
  .save("s3://trfsmallfffile/Interim2Annual/output")

   val FFRowCount =dfMainOutputFinalWithoutNull.groupBy("DataPartition","PartitionYear").count
  
  FFRowCount.coalesce(1).write.format("com.databricks.spark.xml")
  .option("rootTag", "FFFileType")
  .option("rowTag", "FFPhysicalFile")
  .save("s3://trfsmallfffile/Interim2Annual/Descr")

My output is two columns, order interchanged:

(`AnnualPeriodId|^|InterimPeriodId`)

My output:

OrganizationId|^|AnnualPeriodId|^|InterimPeriodId|^|InterimNumber|^|FFAction|!|
4295877812|^|40|^|41|^|1|^|I|!|
4295877234|^|188|^|158|^|1|^|I|!|
4295877320|^|136|^|135|^|1|^|I|!|
4295877225|^|135|^|134|^|1|^|I|!|
4295877766|^|40|^|41|^|1|^|I|!|
4295876332|^|110|^|41|^|2|^|I|!|
4295877812|^|113|^|41|^|2|^|I|!|
4295877320|^|190|^|135|^|2|^|I|!|
4295876308|^|40|^|41|^|1|^|I|!|
4295876646|^|195|^|162|^|1|^|I|!|
4295878608|^|106|^|41|^|2|^|I|!|
4295876738|^|107|^|41|^|2|^|I|!|
4295877812|^|186|^|134|^|2|^|I|!|
4295877734|^|121|^|41|^|2|^|I|!|
4295877413|^|108|^|41|^|2|^|I|!|
4295877766|^|107|^|41|^|2|^|I|!|
4295878608|^|131|^|130|^|1|^|I|!|
4295877985|^|40|^|41|^|1|^|I|!|
4295877923|^|122|^|41|^|2|^|I|!|
4295876308|^|177|^|130|^|2|^|I|!|
4295877413|^|109|^|41|^|2|^|I|!|
4295877225|^|40|^|41|^|1|^|I|!|
4295877413|^|139|^|138|^|1|^|I|!|
4295877766|^|106|^|41|^|2|^|I|!|
4295876308|^|104|^|41|^|2|^|I|!|
4295877204|^|132|^|131|^|1|^|I|!|
4295880574|^|167|^|166|^|2|^|I|!|
4295878126|^|106|^|41|^|2|^|I|!|
4295876509|^|119|^|41|^|2|^|I|!|
4295877734|^|188|^|136|^|2|^|I|!|
4295877923|^|188|^|134|^|2|^|I|!|
4295876139|^|135|^|134|^|1|^|I|!|
4295877413|^|190|^|138|^|2|^|I|!|
4295877225|^|122|^|41|^|2|^|I|!|
4295877812|^|135|^|134|^|1|^|I|!|
4295876646|^|151|^|5|^|2|^|I|!|
4295876139|^|188|^|134|^|2|^|I|!|
4295877225|^|188|^|134|^|2|^|I|!|
4295877234|^|210|^|158|^|2|^|I|!|
4295877923|^|123|^|41|^|2|^|I|!|
4295878863|^|135|^|134|^|1|^|I|!|
4295878863|^|121|^|41|^|2|^|I|!|
4295877234|^|100|^|7|^|1|^|I|!|
4295877812|^|112|^|41|^|2|^|I|!|
4295876332|^|193|^|138|^|2|^|I|!|
4295877225|^|123|^|41|^|2|^|I|!|
4295877320|^|107|^|41|^|2|^|I|!|
4295877734|^|137|^|136|^|1|^|I|!|
4295880574|^|274|^|273|^|2|^|I|!|
4295878608|^|105|^|41|^|2|^|I|!|
4295877320|^|40|^|41|^|1|^|I|!|
4295878608|^|40|^|41|^|1|^|I|!|
4295880491|^|173|^|6|^|2|^|I|!|
4295877985|^|114|^|41|^|2|^|I|!|
4295876646|^|217|^|162|^|2|^|I|!|
4295876738|^|187|^|134|^|2|^|I|!|
4295876509|^|40|^|41|^|1|^|I|!|
4295876139|^|123|^|41|^|2|^|I|!|
4295876509|^|118|^|41|^|2|^|I|!|
4295876646|^|104|^|5|^|1|^|I|!|
4295877234|^|137|^|7|^|2|^|I|!|
4295876547|^|185|^|153|^|1|^|I|!|
4295877734|^|122|^|41|^|2|^|I|!|
4295877766|^|186|^|134|^|2|^|I|!|
4295880574|^|168|^|166|^|2|^|I|!|
4295878126|^|107|^|41|^|2|^|I|!|
4295877234|^|138|^|7|^|2|^|I|!|
4295876738|^|135|^|134|^|1|^|I|!|
4295877766|^|135|^|134|^|1|^|I|!|
4295876646|^|150|^|5|^|2|^|I|!|
4295878126|^|135|^|134|^|1|^|I|!|
4295876139|^|122|^|41|^|2|^|I|!|
4295877204|^|103|^|38|^|2|^|I|!|
4295876332|^|111|^|41|^|2|^|I|!|
4295876332|^|139|^|138|^|1|^|I|!|
4295876308|^|103|^|41|^|2|^|I|!|
4295877734|^|40|^|41|^|1|^|I|!|
4295877871|^|190|^|137|^|2|^|I|!|
4295877923|^|135|^|134|^|1|^|I|!|
4295876547|^|130|^|3|^|2|^|I|!|
4295878863|^|122|^|41|^|2|^|I|!|
4295877204|^|104|^|38|^|2|^|I|!|
4295877985|^|135|^|134|^|1|^|I|!|
4295877871|^|138|^|137|^|1|^|I|!|
4295876332|^|40|^|41|^|1|^|I|!|
4295877871|^|124|^|41|^|2|^|I|!|
4295876139|^|40|^|41|^|1|^|I|!|
4295877204|^|178|^|131|^|2|^|I|!|
4295877413|^|40|^|41|^|1|^|I|!|
4295876509|^|185|^|134|^|2|^|I|!|
4295876308|^|131|^|130|^|1|^|I|!|
4295877871|^|125|^|41|^|2|^|I|!|
4295876738|^|106|^|41|^|2|^|I|!|
4295877923|^|40|^|41|^|1|^|I|!|
4295877985|^|188|^|134|^|2|^|I|!|
4295878126|^|40|^|41|^|1|^|I|!|
4295878863|^|40|^|41|^|1|^|I|!|
4295877204|^|37|^|38|^|1|^|I|!|
4295878608|^|182|^|130|^|2|^|I|!|
4295877320|^|108|^|41|^|2|^|I|!|
4295876547|^|100|^|3|^|1|^|I|!|
4295876547|^|131|^|3|^|2|^|I|!|
4295876547|^|202|^|153|^|2|^|I|!|
4295877871|^|40|^|41|^|1|^|I|!|
4295878863|^|187|^|134|^|2|^|I|!|
4295880491|^|172|^|6|^|2|^|I|!|
4295876738|^|40|^|41|^|1|^|I|!|
4295877985|^|113|^|41|^|2|^|I|!|
4295876509|^|135|^|134|^|1|^|I|!|
4295880491|^|174|^|171|^|2|^|I|!|
4295878126|^|181|^|134|^|2|^|I|!|

For example in DATA FRAME 1 below record is in this order

4295876139|^|134|^|135|^|1|^|I|!|

But in the output i get in this order

4295876139|^|135|^|134|^|1|^|I|!|

This is not if data has I flag .

This is because the this line in my code

val windowSpec2 = Window.partitionBy("OrganizationId","InterimPeriodId").orderBy($"TimeStamp".cast(LongType).desc)

and

val deletedf = latestForEachKey.filter($"FFAction|!|" === "D|!|" || $"FFAction|!|" === "O|!|").select($"OrganizationId", $"InterimPeriodId", lit("delete").as("Delete"))

and

val dfMainOutput = df1resultFinalWithYear.join(deletedf, Seq("OrganizationId", "InterimPeriodId"), "left").filter($"Delete".isNull).drop("Delete")

In the case of Insert or I my column order is:

"OrganizationId","AnnualPeriodId","InterimPeriodId"

In the case of O or D my column order is:

"OrganizationId","InterimPeriodId"

Here is the output that i get where DATA FRAME 1 columns are exchanged .

I hope i am able to explain it clearly .


Solution

  • The interchange happened when you joined the main df1resultFinalWithYear with deletedf. You joined with Seq("OrganizationId", "InterimPeriodId") and thus InterimPeriodId came before AnnualPeriodId. But in the insertdf and headerColumn the order is still opposite. So the interchange happened in the following line

    val dfMainOutput = df1resultFinalWithYear.join(deletedf, Seq("OrganizationId", "InterimPeriodId"), "left").filter($"Delete".isNull).drop("Delete")
    

    You can correct that by ordering the columns as

    val dfMainOutput = df1resultFinalWithYear.join(deletedf, Seq("OrganizationId", "InterimPeriodId"), "left").filter($"Delete".isNull).drop("Delete").select(df1resultFinalWithYear.schema.fieldNames.map(col):_*)
    

    And your problem should be solved.