Search code examples
apache-sparkhivehdfsparquet

Increasing write parallelism with Parquet on Hive


tl;dr - I'm writing a lot of data into a new Parquet format table on Hive, but the job uses far fewer reducers than specified making the writes take much longer than I'd like.

I'm building a data lake table intended to create fast reads with Spark, but I'm writing the data with hive so a) bucketed tables can be read by hive and b) so I can write statistics to the hive metastore.

I create the table from python like so:

hivecur.execute("set hive.cbo.enable=true")
hivecur.execute("set hive.compute.query.using.stats=true")
hivecur.execute("set hive.stats.fetch.column.stats=true")
hivecur.execute("set hive.stats.fetch.partition.stats=true")

hivecur.execute("set hive.vectorized.execution.enabled=true")
hivecur.execute("set hive.vectorized.execution.reduce.enabled=true")

hivecur.execute('set mapred.reduce.tasks=100')

hivecur.execute(f"set dfs.block.size={1024*1024*128}")
hivecur.execute(f"set parquet.block.size={1024*1024*128}")

hivecur.execute(f"drop table if exists {TABLE_NAME}")

table_create_qry = f"""
create table {TABLE_NAME} (
    {schema.dice}
)
partitioned by (process_date_z int, dataset string)
clustered by (id) sorted by (source_id, type, id) into 200 buckets
stored as parquet
TBLPROPERTIES ("comment" = "{git_hash()}",
               "parquet.compress" = "snappy")

And then when I insert:

qry = f"""
        insert overwrite table {TABLE_NAME} partition (process_date_z, dataset)
        select ...
            source_id,
            process_date_z,
            '{dataset}' as dataset
        from {source_table}
        where process_date_z = {d}
        and pmod(hash(id),100) in ({",".join([str(x) for x in id_filters])})"""

By setting mapred.reduce.tasks=100 I was hoping that I would force each partition to contain 100 files give or take. Instead, although 100 tasks are created 92 finish very quickly and eight reduce tasks run longer, writing low tens (but not 100) roughly equal-sized files.

The problem with this is the reducing is a significant bottleneck on the write process. What parameter can I set to improve performance?


Solution

  • My problem came from a dumb choice of hash function, I think.

    I suspect that the algorithm used to bucket by ID was the same hash I used to subset the IDs, and so it created a bucket for all the possible input IDs, but the pmod WHERE only let it populate a few.

    To solve this, I switched the hash inside pmod with brickhouse's Murmurhash3 UDF.

    https://github.com/klout/brickhouse