Search code examples
azurepysparkpipelineazure-synapseazure-synapse-analytics

Split .csv file column in 2 in Azure Synapse Analytics using PySpark


I have a .csv file (in Azure Data Lake Storage), which looks approximately like this -> enter image description here

I want to create a notebook (PySpark (Python)), which could be implemented in the synapse analytics (integrate -> pipeline) in one of the pipelines.

The code in notebook should be able to separate 2nd column in 2 and transform all the rows to GB unit, so that it looks like this: enter image description here

Could you please help with the PySpark code? As I am beginner in Azure synapse analytics and not sure how to do it

!! IMPORTANT: The problem I have is that it all should be done in the same file (no new files have to be created)

Thanks in advance


Solution

  • I read csv file form my storage into a dataframe. Here is my file:

    enter image description here

    I created new column called 'Unit' by splitting consumed column;

    split_cols = pyspark.sql.functions.split(df['Consumed'], ' ')
    
        df = df.withColumn('Unit', split_cols.getItem(1))
        df = df.withColumn('Unit', when(df['Unit'] == 'KB', 'GB').otherwise(df['Unit']))
    

    I converted kb value into GB of consumed value by using below code:

    df =  df.withColumn("Consumed",
                       when(df["Consumed"].contains("GB"),
                            round(regexp_extract(df["Consumed"], r"(\d+\.?\d*)", 1).cast("double"), 2)
                            )
                       .when(df["Consumed"].contains("KB"),
                             round(regexp_extract(df["Consumed"], r"(\d+\.?\d*)", 1).cast("double")/1000000, 5)
                            )
                       .otherwise(0)
                      )
    

    When I try with above one, I am getting 670 kb value as 6.7E-4 i.e., it is converting as scientific notification.

    enter image description here

    So, I formatted it by using below command

    df = df.withColumn("Consumed", when(col("Consumed") == 6.7E-4, format_number(col("Consumed"),5)).otherwise(col("Consumed")))
    

    And selected columns in below format

    df = df.select('ID', 'Consumed', 'Unit', 'Total')
    

    Output:

    enter image description here

    !! IMPORTANT: The problem I have is that it all should be done in the same file (no new files have to be created)

    Mount the path using below procedure:

    Create linked service of path and created mount using below code:

    mssparkutils.fs.mount(
    "abfss://<container>@<storageacc>.dfs.core.windows.net",
    "<mount container>",  
    {"linkedService":"AzureDataLakeStorage1"}
    
    )
    

    enter image description here

    jobid=mssparkutils.env.getJobId()
    path='/synfs/'+jobid+'/<mount container>/<filename>.csv'
    

    enter image description here

    I overwrite the updated dataframe into filename using below code:

    df.toPandas().to_csv(path,index = False)
    

    enter image description here

    Updated file:

    enter image description here