Search code examples
google-cloud-data-fusioncdap

Apply Rank or partitioned row_num function in Data Fusion


I want to implement rank or partitioned row_num function on my data in Data Fusion but I don't find any plugin to do so.

Is there any way to have this ?

I want to implement the below,

enter image description here

Suppose I have this above data, now I want to group the data based on AccountNumber and send the most recent record into one sink and rest to the others. So from the above data,

Sink1 is expected to have,

enter image description here

Sink2 ,

enter image description here

I was planning to have this segregation by applying the rank or row_number partition by AccountNumber and sort by Record_date desc like functionality and send the records with rank=1 or row_num=1 to one sink and rest to other.


Solution

  • A good approach to solve your problem is using the Spark plugin. In order to add it to your Datafusion instance, go to HUB -> Plugins -> Search for Spark -> Deploy the plugin .Then you can find it on Analytics tab.

    To give you an example of how could you use it I created the pipeline below:

    enter image description here

    This pipeline basically:

    1. Reads a file from GCS.
    2. Executes a rank function in your data
    3. Filter the data with rank=1 and rank>1 in different branches
    4. Save your data in different locations

    Now lets take a look more deeply in each component:

    1 - GCS: this is a simple GCS source. The file used for this example has the data showed below

    enter image description here

    2 - Spark_rank: this is a Spark plugin with the code below. The code basically created a temporary view with your data and them apply a query to rank your rows. After that your data comes back to the pipeline. Below you can also see the input and output data for this step. Please notice that the output is duplicated because it is delivered to two branches.

          def transform(df: DataFrame, context: SparkExecutionPluginContext) : DataFrame = {
              df.createTempView("source")
              df.sparkSession.sql("SELECT AccountNumber, Address, Record_date, RANK() OVER (PARTITION BY accountNumber ORDER BY record_date DESC) as rank FROM source")
        }
    

    enter image description here

    3 - Spark2 and Spark3: like the step below, this step uses the Spark plugin to transform the data. Spark2 gets only the data with rank = 1 using the code below

        def transform(df: DataFrame, context: SparkExecutionPluginContext) : DataFrame = {
          df.createTempView("source_0")
          df.sparkSession.sql("SELECT AccountNumber, Address, Record_date FROM 
        source_0 WHERE rank = 1")
        }
    

    Spark3 gets the data with rank > 1 using the code below:

        def transform(df: DataFrame, context: SparkExecutionPluginContext) : DataFrame = {
          df.createTempView("source_1")
          df.sparkSession.sql("SELECT accountNumber, address, record_date FROM source_1 WHERE rank > 1")
        }
    

    4 - GCS2 and GCS3: finally, in this step your data gets saved into GCS again.