I am trying to use Spark Structured Streaming - writeStream
API to write to an External Partitioned Hive table.
CREATE EXTERNAL TABLE `XX`(
`a` string,
`b` string,
`b` string,
`happened` timestamp,
`processed` timestamp,
`d` string,
`e` string,
`f` string )
PARTITIONED BY (
`year` int, `month` int, `day` int)
CLUSTERED BY (d)
INTO 6 BUCKETS
STORED AS ORC
TBLPROPERTIES (
'orc.compress'='ZLIB',
'orc.compression.strategy'='SPEED',
'orc.create.index'='true',
'orc.encoding.strategy'='SPEED');
and in Spark code,
val hiveOrcWriter: DataStreamWriter[Row] = event_stream
.writeStream
.outputMode("append")
.format("orc")
.partitionBy("year","month","day")
//.option("compression", "zlib")
.option("path", _table_loc)
.option("checkpointLocation", _table_checkpoint)
I see that on a non partition table, records are inserted into Hive. However, on using partitioned table, the spark job does not fail or raise exceptions but records are not inserted to Hive table.
Appreciate comments from anyone who has dealt with similar problems.
Edit:
Just discovered that the .orc files are indeed written to the HDFS, withe correct partition directory structure: eg. /_table_loc/_table_name/year/month/day/part-0000-0123123.c000.snappy.orc
However
select * from 'XX' limit 1; (or where year=2018)
returns no rows.
The InputFormat
and OutputFormat
for the Table 'XX' are org.apache.hadoop.hive.ql.io.orc.OrcInputFormat
and
org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat
respectively.
This feature isn't provided out of the box in structured streaming. In normal processing, you would use dataset.write.saveAsTable(table_name)
, and that method isn't available.
After processing and saving the data in HDFS, you can manually update the partitions (or using a script that does this on a schedule):
If you use Hive
MSCK REPAIR TABLE table_name
If you use Impala
ALTER TABLE table_name RECOVER PARTITIONS