首页 > 解决方案 > 在 Hive 上使用 Parquet 增加写入并行度

问题描述

tl;dr - 我正在将大量数据写入 Hive 上的新 Parquet 格式表,但该作业使用的减速器比指定的要少得多,这使得写入所需的时间比我想要的要长得多。

我正在构建一个数据湖表,旨在使用 Spark 创建快速读取,但我正在使用配置单元写入数据,因此 a) 存储桶表可以由配置单元读取,b) 所以我可以将统计信息写入配置单元元存储。

我像这样从python创建表:

hivecur.execute("set hive.cbo.enable=true")
hivecur.execute("set hive.compute.query.using.stats=true")
hivecur.execute("set hive.stats.fetch.column.stats=true")
hivecur.execute("set hive.stats.fetch.partition.stats=true")

hivecur.execute("set hive.vectorized.execution.enabled=true")
hivecur.execute("set hive.vectorized.execution.reduce.enabled=true")

hivecur.execute('set mapred.reduce.tasks=100')

hivecur.execute(f"set dfs.block.size={1024*1024*128}")
hivecur.execute(f"set parquet.block.size={1024*1024*128}")

hivecur.execute(f"drop table if exists {TABLE_NAME}")

table_create_qry = f"""
create table {TABLE_NAME} (
    {schema.dice}
)
partitioned by (process_date_z int, dataset string)
clustered by (id) sorted by (source_id, type, id) into 200 buckets
stored as parquet
TBLPROPERTIES ("comment" = "{git_hash()}",
               "parquet.compress" = "snappy")

然后当我插入时:

qry = f"""
        insert overwrite table {TABLE_NAME} partition (process_date_z, dataset)
        select ...
            source_id,
            process_date_z,
            '{dataset}' as dataset
        from {source_table}
        where process_date_z = {d}
        and pmod(hash(id),100) in ({",".join([str(x) for x in id_filters])})"""

通过设置mapred.reduce.tasks=100,我希望我会强制每个分区包含 100 个文件。相反,虽然创建了 100 个任务,但其中 92 个任务完成得非常快,而 8 个 reduce 任务运行时间更长,编写了 10 个(但不是 100 个)大致相等大小的文件。

这样做的问题是减少是写入过程的一个重要瓶颈。我可以设置什么参数来提高性能?

标签: apache-sparkhivehdfsparquet

解决方案


我认为我的问题来自对哈希函数的愚蠢选择。

我怀疑用于按 ID 分桶的算法与我用来对 ID 进行子集化的哈希相同,因此它为所有可能的输入 ID 创建了一个桶,但 pmod WHERE 只允许它填充一些。

为了解决这个问题,我用砖屋的 Murmurhash3 UDF 切换了 pmod 内的哈希。

https://github.com/klout/brickhouse


推荐阅读