Search code examples
apache-sparkstructparquetdbtspark-thriftserver

incremental model in dbt with struct fields


I'm evaluating DBT for a possible usecase, and everything seems just fine except for one single situation. This is when the source table has struct fields.

I'm using Spark Thrift Server connector, underlying data is stored as parquet in S3. DBT version is 0.20

This is a piece of the source table create sentence, as you can se there are struct fields in there.

CREATE TABLE `<someSchema>`.`<sourceTable>` (
  `properties` STRUCT<`site`: STRING>,
  `channel` STRING,
  `timestamp` STRING,
  `anotherDate` STRING,
  `aDate ` STRING)
  USING parquet
  PARTITIONED BY (aDate)
  LOCATION 's3a://<someBucket>'

My model just perform a select over that table with certain where clauses. First time it is run, it works just fine, it creates a table exactly the same as the original with some minor changes, just as expected, even with the struct fields.

Here is a piece of the sink table create table

CREATE TABLE `<someSchema>`.`dbtsink` (
      `properties` STRUCT<`site`: STRING>,
      `channel` STRING,
      `timestamp` STRING,
      `anotherDate ` STRING,
      `aDate` STRING)
USING parquet
PARTITIONED BY (anotherDate)

My problem comes when I run dbt again with some other values in the where clauses, it should create another partition in sink table. Query compilation is just fine

It raises this error:

Runtime Error in model dbtsink (models/anotherDate/dbtsink.sql)
  Database Error
    Error running query: org.apache.spark.sql.AnalysisException: cannot resolve '`site`' given input columns: [dbtsink__dbt_tmp.channel, dbtsink__dbt_tmp.anotherDate, dbtsink__dbt_tmp.aDate, dbtsink__dbt_tmp.properties, dbtsink__dbt_tmp.timestamp]; line 4 pos 25;
    'InsertIntoStatement 'UnresolvedRelation [someSchema, dbtsink], false, false
    +- 'Project [properties#6526, 'site, channel#6527, timestamp#6528, aDate#6541, anotherDate#6540]
       +- SubqueryAlias dbtsink__dbt_tmp
          +- Project [properties#6526, channel#6527, timestamp#6528, anotherDate#6540, aDate#6541]
             +- Filter (((aDate#6541 > 2021060100) AND (aDate#6541 <= 2021070609)) AND (anotherDate#6540 = 2021070609))
                +- SubqueryAlias spark_catalog.someSchema.sourceTable
                   +- Relation[context#6524,traits#6525,properties#6526,channel#6527,timestamp#6528,projectId#6529,integrations#6530,messageId#6531,originalTimestamp#6532,receivedAt#6533,sentAt#6534,userId#6535,anonymousId#6536,type#6537,providerId#6538,version#6539,anotherDate#6540,aDate#6541] parquet

It seems that it's trying to read or write the inner field of the struct as a root field. I tested with other struct fields and it happens the same I just want the struct as such, as in the very first execution. As I said, it only happens in the second execution.

This is my model's query, it is very simple

select 
properties,
channel,
timestamp,
anotherDate,
aDate
from {{ source('someSchema', 'sourceTable') }}
where aDate > '{{ var("aDateLowerLimit") }}' and aDate <= '{{ var("aDateUpperLimit") }}'
and anotherDate = '{{ var("anotherDate") }}'

If I change the select to convert properties from struct to json with to_json(properties) it works just as expected, generating a new partition.

If something wrong with structs in DBT? I am doing something wrong?

I am using incremental materialization and I tested it with append andn insert_overwrite, that didn't seem to be the problem


Solution

  • The problem is related to how DBT is trying to parse the columns in a Spark table using a REGEX. See the parse_columns_from_information function.

    You can not use a REGEX for parsing a table schema. That function is using the results provided by this Spark SQL statement: show table extended in someSchema like '*'. When using that statement you get something like this for your table schema:

    Schema: root
     |-- properties: struct (nullable = true)
     |    |-- site: string (nullable = true)
     |-- channel: string (nullable = true)
     |-- timestamp: string (nullable = true)
     |-- anotherDate: string (nullable = true)
     |-- aDate: string (nullable = true)
    

    Applying that REGEX to the above string will mess up your columns as you have experienced.

    You can work around this problem using parse_describe_extended. This function is using the results provided by this Spark SQL statement: describe extended someSchema.dbtsink. In order to use parse_describe_extended you need to disable the DBT cache (something that could be harmful) For disabling the DBT cache you can use this dbt argument: --bypass-cache.