Search code examples
apache-sparkapache-spark-sqlspark-streamingapache-spark-dataset

how to handle this in spark


I am using spark-sql 2.4.x version , datastax-spark-cassandra-connector for Cassandra-3.x version. Along with kafka.

I have a scenario for some finance data coming from kafka topic. data (base dataset) contains companyId, year , prev_year fields information.

If columns year === prev_year then I need to join with different table i.e. exchange_rates.

If columns year =!= prev_year then I need to return the base dataset itself

How to do this in spark-sql ?


Solution

  • You can refer below approach for your case.

    scala> Input_df.show
    +---------+----+---------+----+
    |companyId|year|prev_year|rate|
    +---------+----+---------+----+
    |        1|2016|     2017|  12|
    |        1|2017|     2017|21.4|
    |        2|2018|     2017|11.7|
    |        2|2018|     2018|44.6|
    |        3|2016|     2017|34.5|
    |        4|2017|     2017|  56|
    +---------+----+---------+----+
    
    
    scala> exch_rates.show
    +---------+----+
    |companyId|rate|
    +---------+----+
    |        1|12.3|
    |        2|12.5|
    |        3|22.3|
    |        4|34.6|
    |        5|45.2|
    +---------+----+
    
    
    scala> val equaldf = Input_df.filter(col("year") === col("prev_year"))
    
    scala> val notequaldf = Input_df.filter(col("year") =!= col("prev_year"))
    
    scala> val joindf  = notequaldf.alias("n").drop("rate").join(exch_rates.alias("e"), List("companyId"), "left")
    
    scala> val finalDF = equaldf.union(joindf)
    
    scala> finalDF.show()
    +---------+----+---------+----+
    |companyId|year|prev_year|rate|
    +---------+----+---------+----+
    |        1|2017|     2017|21.4|
    |        2|2018|     2018|44.6|
    |        4|2017|     2017|  56|
    |        1|2016|     2017|12.3|
    |        2|2018|     2017|12.5|
    |        3|2016|     2017|22.3|
    +---------+----+---------+----+