Search code examples
apache-sparkpysparkazure-data-factorydatabricksazure-databricks

How to achieve Column mapping just like in ADF in Databricks


enter image description here

As you can see , this is the general column mapping with dtypes in a copy activity in ADF where we can change the dtypes, col names etc from Source to Destination.

If I want the same setup in databricks like I need to map the Id column with CustomerID, How can achieve this because i just know how to read the table and write the table from and to SQL servers if the schema matches. But here i need a custom mapping so that all the cols will be populated correctly from Source Schema to Destination Schema.

I know I can rename the cols just as Target before writing to Target.Thats kinda very manual way and definitely need a more efficient and automated way ,kinda generic function so that i can use the function for all other tables too.

And Both Source and Target tables are created on some for example dev catelog ,so these are the warehouse Sql server tables.


Solution

  • When you are looking for a GUI solution in Databricks then you will not find something similar to ADF. You need to provide spark code. The renaming in ADF is also not done automatically. Anyone needs to give the information what the target column should be like otherwise the destination name is called like the input.

    What I do is having a TableMapping class that iterates over its ColumnMapping instances and calls the apply_column_renaming_and_casting function. But this method picks up only the defined columns. And the columns need to be cast save. If you want a projection of all columns just write some dummy code to recognize the input-columns and add it to the columns attribute.

    Something similar to the following code should be happen in the background of ADF.

    your_df = load_your_dataframe_by_whatever
    df_new = table_mapper.apply_column_renaming_and_casting(your_df)
    
    
    table_mapper = TableMapping(
        name="any name",
        columns=[
            ColumnMapping(
                target=Column(
                    name="CustomerId", dtype=IntegerType(), nullable=True
                ),
                source=Column(
                    name="Id", dtype=StringType(), nullable=True
                )
            ),
            ColumnMapping(
                target=Column(name="LastName", dtype=StringType(), nullable=True),
                source=Column(name="Name", dtype=StringType(), nullable=True),
            ),
        ]
    )
    
    
    @dataclass
    class TableMapping:
        """
        Definition of one spark table [DataFrame] to another.
        Can be used to select and rename columns, as well as for casting dtypes.
        """
    
        name: str
        """the name of the target table"""
        columns: List["ColumnMapping"]
        """The columns of the table (target and source)"""
    
        def apply_column_renaming_and_casting(self, df: DataFrame) -> DataFrame:
            """
            A function to rename and cast a table to the target schema.
            :param df:
            :return:
            """
            for column in self.columns:
                if column.source and (column.source.name != column.target.name):
                    df = df.withColumnRenamed(column.source.name, column.target.name)
                if column.source and (column.source.dtype != column.target.dtype):
                    df = df.withColumn(
                        column.target.name,
                        f.col(column.target.name).cast(column.target.dtype),
                    )
    
            return df
    
    
    @dataclass
    class ColumnMapping:
        """
        Definition of a single column mapping (source to target column).
        """
    
        target: "Column" = None
        """This is the target (output) column"""
        source: "Column" = None
        "This is the source (input) column. When omitted, the target column will be empty."
    
    @dataclass
    class Column:
        """
        Column definition, consisting of name and dtype
        """
    
        name: str
        """Name of the column"""
        dtype: DataType
        """Dtype of the column"""
        nullable: bool
        """Nullable property of the column"""