I am trying to merge 5 dataframes from csv's and creating new dataframe than in the second step I am creating empty table with custom schema, now I am want to load the records from the dataframe.
Here is the step by step details.
cr_df = spark.read.format("csv").option("header", "true").load("abfss://abcxxxxxxxxxxxx.dfs.core.windows.net/Position1.csv")
ir_df = spark.read.format("csv").option("header", "true").load("abfss://abc2xxxxxxxxxxxx.dfs.core.windows.net/Position2.csv")
fx_df = spark.read.format("csv").option("header", "true").load("abfss://abc3xxxxxxxxxxxx.dfs.core.windows.net/Position3.csv")
eq_df = spark.read.format("csv").option("header", "true").load("abfss://abc4xxxxxxxxxxxx.dfs.core.windows.net/Position4.csv")
co_df = spark.read.format("csv").option("header", "true").load("abfss://[email protected]/Position5.csv")
merged_df = cr_df.unionByName(ir_df, allowMissingColumns=True) \
.unionByName(fx_df, allowMissingColumns=True) \
.unionByName(eq_df, allowMissingColumns=True) \
.unionByName(co_df, allowMissingColumns=True)
CREATE TABLE staging.ddr_position_test
(
ReportDate DATE ,
JurisdictionId INTEGER ,
TransactionId VARCHAR(256) ,
ReportAssetClass VARCHAR(30) ,
ReportTradeSequence DECIMAL(4) ,
LoadId INTEGER,
DataSourceId VARCHAR(4),
Cleared VARCHAR(50)
) USING DELTA
PARTITIONED BY (ReportDate, ReportAssetClass)
LOCATION 'abfss://[email protected]//silver//delta/ddr_position_test/'
Step 4: I was getting shcema mismatch error hence I created column datatypes for this merge dataframe, please note I am cant use schema overrite or merge option because i have requirement with certain type of column datatypes and also few columns needs to rename.
df = decimal_to_string_Cols_df.withColumn("ReportDate", lit("ReportDate").cast(DateType())) \
.withColumn("JurisdictionId", lit("JurisdictionId").cast(IntegerType())) \
.withColumn("ReportAssetClass", lit("ReportAssetClass").cast(StringType())) \
.withColumn("ReportTradeSequence", lit("ReportTradeSequence").cast(DecimalType(4))) \
.withColumn("LoadId", lit("LoadId").cast(IntegerType())) \
.withColumn("CreatedTimestamp", lit("CreatedTimestamp").cast(TimestampType()))
and as final step (step-5) I am writing and saving this like
df.write.mode('append').format('delta') \
.option("path", "abfss://[email protected]/delta/ddr_position_test/") \
.saveAsTable("staging.ddr_position_test")
After doing some research i believe this error is might be due to null or not null columns (just a wild guess), any help or suggestions would be appreciated.
The problem is in definition of some of the table columns as with the fixed length - it looks like some value for one of the columns exceeds the specified limit.
So you either need to increase size of specific column(s), or instead of using varchar(N)
just use string
type that doesn't have hard limit on the text size.