Search code examples
azureazure-synapse

Upserting rows with Azure Synapse from Dynamics 365 using versionnumber


I am new to Synapse Data Flows and have not been able to achieve, what might be a simple Transaformation. In Dynamics I have a table with over 3.5 million rows, that i am bringing daily with a simple dataflow (source > sink) into a Dedicated SQL Database in Synapse. Right now I'm truncating the table and loading all data with every pipeline run.

I want to only insert new rows (key is GUID) and update existing rows, where the field versionnumber has changed.

I found this detailed guide for SSIS but have not been able to replicate it in Synapse. I'm not sure what transformations I need.

enter image description here

I have tried with Source > Alter Rows > Sink, providing a definition for upsert in alter rows with only the GUID or with a combination of GUID and versionnumber.


Solution

  • Steps to change the data in sink as per source using ADF dataflow

    1. Delete the rows which are in sink (SQL pool) but not in source
    2. Upsert (Update if available, insert if not available) the rows that are available in source to sink.

    I tried to repro this using sample dataset. Below is the approach

    • Initially Data in source and sink are taken as in below image. enter image description here

    • In source, Name is updated in id=1, new row is inserted with id=6 and row with id=3 is deleted.

    | id | Name      |
    |----|-----------|
    | 1  | Kaala     |
    | 2  | Arulmozhi |
    | 6  | Rajaraja  |
    

    Step:1 To delete the rows

    • In dataflow, Source1 is taken with above data.

    • Source2 is taken with the synapse SQL pool table.

    • Exists transformation is added in order to find the ids which are not in source1 but in source2.

      Left stream: source2 Right stream: source1 Exists type: Doesn't exist Exists condition: Source1@id = Source2@id

    enter image description here

    • Alter Row transformation is added next to exists transformation, and condition given is delete if true() enter image description here

    • Sink dataset is same as source2 dataset. (Synapse SQL pool). In sink settings, Allow delete is selected as update method. id is selected as Key column.

    enter image description here

    • The above steps will delete the records in sink which are deleted from source.

    Step:2 To upsert the data

    • In order to upsert, near source1 transformation, new branch is selected.enter image description here

    • Alter transformation is added and condition given is upsert if true(). enter image description here

    • Sink transformation is added and same sql pool is given as sink dataset. Update method is allow upsert and key column given as id.

    enter image description here

    Overall design of dataflow enter image description here

    When the pipeline is run with this dataflow, data is upserted and deleted as per source.

    enter image description here

    You can replace id column of this repro with version_number+GUID column combinations and do the same steps.