Search code examples
apache-sparkhivespark-structured-streamingorchive-partitions

Spark Structured Streaming Writestream to Hive ORC Partioned External Table


I am trying to use Spark Structured Streaming - writeStream API to write to an External Partitioned Hive table.

CREATE EXTERNAL TABLE `XX`(
`a` string,
`b` string,
`b` string,
`happened` timestamp,
`processed` timestamp,
`d` string,
`e` string,
`f` string )
 PARTITIONED BY (
`year` int, `month` int, `day` int)      
 CLUSTERED BY (d)
INTO 6 BUCKETS
STORED AS ORC 
TBLPROPERTIES (
'orc.compress'='ZLIB',
'orc.compression.strategy'='SPEED',
'orc.create.index'='true',
'orc.encoding.strategy'='SPEED');

and in Spark code,

val hiveOrcWriter:   DataStreamWriter[Row] = event_stream
  .writeStream
  .outputMode("append")
  .format("orc")
  .partitionBy("year","month","day")
  //.option("compression", "zlib")
  .option("path", _table_loc)
  .option("checkpointLocation", _table_checkpoint)

I see that on a non partition table, records are inserted into Hive. However, on using partitioned table, the spark job does not fail or raise exceptions but records are not inserted to Hive table.

Appreciate comments from anyone who has dealt with similar problems.

Edit:

Just discovered that the .orc files are indeed written to the HDFS, withe correct partition directory structure: eg. /_table_loc/_table_name/year/month/day/part-0000-0123123.c000.snappy.orc

However

select * from 'XX' limit 1; (or where year=2018)

returns no rows.

The InputFormat and OutputFormat for the Table 'XX' are org.apache.hadoop.hive.ql.io.orc.OrcInputFormat and org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat respectively.


Solution

  • This feature isn't provided out of the box in structured streaming. In normal processing, you would use dataset.write.saveAsTable(table_name) , and that method isn't available.

    After processing and saving the data in HDFS, you can manually update the partitions (or using a script that does this on a schedule):

    If you use Hive

    MSCK REPAIR TABLE table_name
    

    If you use Impala

    ALTER TABLE table_name RECOVER PARTITIONS