首页 > 解决方案 > How to convert a parquet file to an ORC file with dataset larger than RAM memory?

问题描述

I'm planning to do some tests with ORC files formats, but I couldn't create an ORC file from other file.

I have a sequential/columnar data storage in a Parquet (also HDF5 and CSV) file that I can manipulate easily with Spark and Pandas.

I'm working locally, on a MAC-OS with just 8GB RAM. The table has 80000 columns and 7000 rows. Each column represents a measurement source and the rows are the measures all over the time.

I'm using Pycharm and already tryed to convert with Spark read/write dataframe and SparkSQL/Hive, with local and HDFS.

Code:

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("ConvertPqtoOrc") \
    .config('spark.sql.debug.maxToStringFields', 100000) \
    .config('spark.network.timeout', 10000000) \
    .config('spark.executor.heartbeatInterval', 10000000) \
    .config('spark.storage.blockManagerSlaveTimeoutMs', 10000000) \
    .config('spark.executor.memory', '6g') \
    .config('spark.executor.cores', '4') \
    .config('spark.driver.memory', '6g') \
    .config('spark.cores.max', '300') \
    .config('spark.sql.orc.enabled', 'true') \
    .config('spark.sql.hive.convertMetastoreOrc', 'true') \
    .config('spark.sql.orc.filterPushdown', 'true') \
    .enableHiveSupport() \
    .getOrCreate()

sc = spark.sparkContext

Spark read/write try:

df_spark = spark.read.parquet(pq_path)
df_spark.write.mode("overwrite").format("orc").save(ORC_path)

Spark SQL / Hive try:

spark.sql("DROP TABLE if exists tbl_pq")
spark.sql("DROP TABLE if exists tbl_orc")

spark.sql("CREATE TABLE IF NOT EXISTS tbl_pq USING PARQUET LOCATION '{}'".format(db_path + Pq_file))

spark.sql("CREATE TABLE IF NOT EXISTS tbl_orc ({0} double) STORED AS ORC LOCATION '{1}'".format(" double, ".join(map(str, myColumns)), db_path + Output_file))

spark.sql("INSERT OVERWRITE TABLE tbl_orc SELECT * FROM tbl_porque")

Both methods work well with a filtered dataset (up to 20k columns / 7000 rows), when I use the entire table, after a long time, it shows this warning/error below in both cases:

WARN DAGScheduler: Broadcasting large task binary with size 7.3 MiB
ERROR Utils: Aborting task
java.lang.OutOfMemoryError: Java heap space

标签: pythonapache-sparkparquetorc

解决方案


推荐阅读