Search code examples
csvapache-sparkpysparkdelta-lakeazure-synapse-analytics

Unable to saveAsTable in pyspark dataframe : apache.spark.sql.delta.schema.InvariantViolationException error: Exceeds char/varchar type length


I am trying to merge 5 dataframes from csv's and creating new dataframe than in the second step I am creating empty table with custom schema, now I am want to load the records from the dataframe.

Here is the step by step details.

  1. creating datatframes from all 5 datafromes
cr_df = spark.read.format("csv").option("header", "true").load("abfss://abcxxxxxxxxxxxx.dfs.core.windows.net/Position1.csv")
ir_df = spark.read.format("csv").option("header", "true").load("abfss://abc2xxxxxxxxxxxx.dfs.core.windows.net/Position2.csv")
fx_df = spark.read.format("csv").option("header", "true").load("abfss://abc3xxxxxxxxxxxx.dfs.core.windows.net/Position3.csv")
eq_df = spark.read.format("csv").option("header", "true").load("abfss://abc4xxxxxxxxxxxx.dfs.core.windows.net/Position4.csv")
co_df = spark.read.format("csv").option("header", "true").load("abfss://[email protected]/Position5.csv")
  1. Merge above dataframes :
merged_df = cr_df.unionByName(ir_df, allowMissingColumns=True) \
    .unionByName(fx_df, allowMissingColumns=True) \
    .unionByName(eq_df, allowMissingColumns=True) \
    .unionByName(co_df, allowMissingColumns=True)
  1. Create empty table with custom schema:
CREATE TABLE staging.ddr_position_test
       (
        ReportDate            DATE ,
        JurisdictionId        INTEGER ,
        TransactionId         VARCHAR(256) ,
        ReportAssetClass      VARCHAR(30) ,
        ReportTradeSequence   DECIMAL(4) ,
        LoadId                INTEGER,
        DataSourceId          VARCHAR(4),
        Cleared               VARCHAR(50)
) USING DELTA 
PARTITIONED BY (ReportDate, ReportAssetClass) 
LOCATION 'abfss://[email protected]//silver//delta/ddr_position_test/'

Step 4: I was getting shcema mismatch error hence I created column datatypes for this merge dataframe, please note I am cant use schema overrite or merge option because i have requirement with certain type of column datatypes and also few columns needs to rename.

df = decimal_to_string_Cols_df.withColumn("ReportDate", lit("ReportDate").cast(DateType())) \
        .withColumn("JurisdictionId", lit("JurisdictionId").cast(IntegerType())) \
        .withColumn("ReportAssetClass", lit("ReportAssetClass").cast(StringType())) \
        .withColumn("ReportTradeSequence", lit("ReportTradeSequence").cast(DecimalType(4))) \
        .withColumn("LoadId", lit("LoadId").cast(IntegerType())) \
        .withColumn("CreatedTimestamp", lit("CreatedTimestamp").cast(TimestampType())) 

and as final step (step-5) I am writing and saving this like

df.write.mode('append').format('delta') \
  .option("path", "abfss://[email protected]/delta/ddr_position_test/") \
  .saveAsTable("staging.ddr_position_test")

now I am getting this error - enter image description here

enter image description here

After doing some research i believe this error is might be due to null or not null columns (just a wild guess), any help or suggestions would be appreciated.


Solution

  • The problem is in definition of some of the table columns as with the fixed length - it looks like some value for one of the columns exceeds the specified limit.

    So you either need to increase size of specific column(s), or instead of using varchar(N) just use string type that doesn't have hard limit on the text size.