Search code examples
hivehiveqltrino

How can I create a TIMESTAMP column in HIVE with a string based timestamp?


I am attempting to create a table in HIVE so that it can be queried via Trino .. but getting an error. My guess is I need to transform or somehow modify the string or do something with the formatting? do I do that at the CREATE TABLE step? no idea

enter image description here

enter image description here

use hive.MYSCHEMA;
USE
trino:MYSCHEMA> CREATE TABLE IF NOT EXISTS hive.MYSCHEMA.MYTABLE (
           ->                  column_1           VARCHAR,
           ->                  column_2           VARCHAR,
           ->                  column_3           VARCHAR,
           ->                  column_4           BIGINT,
           ->                  column_5           VARCHAR,
           ->                  column_6           VARCHAR,
           ->                  query_start_time   TIMESTAMP)
           ->                WITH (
           ->                  external_location = 's3a://MYS3BUCKET/dir1/dir2/',
           ->                  format = 'PARQUET');
CREATE TABLE
trino:MYSCHEMA> SELECT * FROM MYTABLE;

Query 20220926_131538_00008_dbc39, FAILED, 1 node
Splits: 1 total, 0 done (0.00%)
1.72 [0 rows, 0B] [0 rows/s, 0B/s]

Query 20220926_131538_00008_dbc39 failed: Failed to read Parquet file: s3a://MYS3BUCKET/dir1/dir2/20220918_194105-135895.snappy.parquet

the full stacktrace is as follows

io.trino.spi.TrinoException: Failed to read Parquet file: s3a://MYS3BUCKET/dir1/dir2/20220918_194105-135895.snappy.parquet
    at io.trino.plugin.hive.parquet.ParquetPageSource.handleException(ParquetPageSource.java:169)
    at io.trino.plugin.hive.parquet.ParquetPageSourceFactory.lambda$createPageSource$6(ParquetPageSourceFactory.java:271)
    at io.trino.parquet.reader.ParquetBlockFactory$ParquetBlockLoader.load(ParquetBlockFactory.java:75)
    at io.trino.spi.block.LazyBlock$LazyData.load(LazyBlock.java:406)
    at io.trino.spi.block.LazyBlock$LazyData.getFullyLoadedBlock(LazyBlock.java:385)
    at io.trino.spi.block.LazyBlock.getLoadedBlock(LazyBlock.java:292)
    at io.trino.spi.Page.getLoadedPage(Page.java:229)
    at io.trino.operator.TableScanOperator.getOutput(TableScanOperator.java:314)
    at io.trino.operator.Driver.processInternal(Driver.java:411)
    at io.trino.operator.Driver.lambda$process$10(Driver.java:314)
    at io.trino.operator.Driver.tryWithLock(Driver.java:706)
    at io.trino.operator.Driver.process(Driver.java:306)
    at io.trino.operator.Driver.processForDuration(Driver.java:277)
    at io.trino.execution.SqlTaskExecution$DriverSplitRunner.processFor(SqlTaskExecution.java:736)
    at io.trino.execution.executor.PrioritizedSplitRunner.process(PrioritizedSplitRunner.java:164)
    at io.trino.execution.executor.TaskExecutor$TaskRunner.run(TaskExecutor.java:515)
    at io.trino.$gen.Trino_397____20220926_094436_2.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: java.lang.UnsupportedOperationException: io.trino.spi.type.ShortTimestampType
    at io.trino.spi.type.AbstractType.writeSlice(AbstractType.java:115)
    at io.trino.parquet.reader.BinaryColumnReader.readValue(BinaryColumnReader.java:54)
    at io.trino.parquet.reader.PrimitiveColumnReader.lambda$readValues$2(PrimitiveColumnReader.java:248)
    at io.trino.parquet.reader.PrimitiveColumnReader.processValues(PrimitiveColumnReader.java:304)
    at io.trino.parquet.reader.PrimitiveColumnReader.readValues(PrimitiveColumnReader.java:246)
    at io.trino.parquet.reader.PrimitiveColumnReader.readPrimitive(PrimitiveColumnReader.java:235)
    at io.trino.parquet.reader.ParquetReader.readPrimitive(ParquetReader.java:441)
    at io.trino.parquet.reader.ParquetReader.readColumnChunk(ParquetReader.java:540)
    at io.trino.parquet.reader.ParquetReader.readBlock(ParquetReader.java:523)
    at io.trino.parquet.reader.ParquetReader.lambda$nextPage$3(ParquetReader.java:272)
    at io.trino.parquet.reader.ParquetBlockFactory$ParquetBlockLoader.load(ParquetBlockFactory.java:72)
    ... 17 more

Solution

  • We can achieve the desired results by splitting the task into 2 steps. Hive does not have a feature to transform a string to the timestamp in DDL.

    So first we create 2 tables.

    Fist we create the original table with the data

    CREATE TABLE IF NOT EXISTS 
        hive.MYSCHEMA.MYTABLE (
            column_1           VARCHAR,
            column_2           VARCHAR,
            column_3           VARCHAR,
            column_4           BIGINT,
            column_5           VARCHAR,
            column_6           VARCHAR,
            query_start_time   VARCHAR)
        WITH (
            external_location = 's3a://MYS3BUCKET/dir1/dir2/',
            format = 'PARQUET');
    

    Next the new table with correct timestamp data type

    CREATE TABLE IF NOT EXISTS 
        hive.MYSCHEMA.NEWTABLE (
            column_1           VARCHAR,
            column_2           VARCHAR,
            column_3           VARCHAR,
            column_4           BIGINT,
            column_5           VARCHAR,
            column_6           VARCHAR,
            query_start_time   TIMESTAMP)
        WITH (
            external_location = 's3a://MYS3BUCKET/newlocation/',
            format = 'PARQUET');
    

    Now we move data from MYTABLE to NEWTABLE with conversion using

    INSERT OVERWRITE TABLE NEWTABLE Select column_1, column_2, column_3, ...., column_6,
    unix_timestamp(query_start_time, 'yyyy-MM-ddTHH:mm:ss.SSSSSSZ') as query_start_time from MYTABLE;
    

    You will have to test for the correct format in the unix_timestamp function by reading here

    This will first convert the string column to timestamp and then store it in the new table. This means that all the old data will be read and stored in the new location. You can think of it as an ETL job in Hive.

    Additional Information to Why this conversion needs ETL although we have Schema ON Read

    Schema ON Read is powerful for Big Data. It allows you to change the data type of a column stored in data while reading. For example, you have the ID column as INT in your file but you can read it as STRING/VARCHAR if you define the column type as a string in your DDL. Similarly reading a TIMESTAMP data as DATETIME. This is useful for schema evolution or reading from multiple sources with different datatypes.

    Now why we couldn't use this power in the above scenario?

    This will be the case for every scenario where you want to process the column. e.g. split one string column into two columns. The reason why we have to perform ETL, in this case, is because in parquet/avro timestamp datatype is not a primitive type. It is of type long int and with the additional property of logical_type as datetime/timestamp. You can read here-parquet and here-avro about logical types for further clarification.