首页 > 解决方案 > 在 Spark 中使用 JDBC 连接读取 Postgres 数据库的问题

问题描述

在(Py)Spark 中使用 JDBC 连接从 Postgres 数据库读取数据时,我目前遇到了一些问题。我在 Postgres 中有一个表,我想在 Spark 中读取、处理它并将结果保存为 AWS S3 存储桶中的 .parquet 文件。

我创建了一个示例脚本,它执行一些基本逻辑(不要使问题过于复杂):

from pyspark.sql import SparkSession
from pyspark.sql.functions import length
import argparse
import uuid
import datetime

def parse_arguments():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        "--loc",
        type=str,
        default="./",
        help="Output location"
    )

    args = parser.parse_known_args()[0]
    return args

if __name__=="__main__":
    args = parse_arguments()

    spark = SparkSession.builder. \
        appName("test-script"). \
        config("spark.jars.packages", "com.johnsnowlabs.nlp:spark-nlp_2.11:2.6.5,org.postgresql:postgresql:42.1.1"). \
        getOrCreate()

    CENTRAL_ID = "id"
    PG_HOST = spark.conf.get("spark.yarn.appMasterEnv.PG_HOST")
    PG_PORT = spark.conf.get("spark.yarn.appMasterEnv.PG_PORT")
    PG_USER = spark.conf.get("spark.yarn.appMasterEnv.PG_USER")
    PG_DB = spark.conf.get("spark.yarn.appMasterEnv.PG_DB")
    PG_PASS = spark.conf.get("spark.yarn.appMasterEnv.PG_PASS")
    PG_MAX_CONCURRENT = spark.conf.get("spark.yarn.appMasterEnv.PG_MAX_CONCURRENT")

    table = "test_schema.test_table"
    partitions = spark.sparkContext.defaultParallelism
    fetch_size = 2000

    data = spark.read.format("jdbc"). \
        option("url", "jdbc:postgresql://{}:{}/{}".format(PG_HOST, PG_PORT, PG_DB)). \
        option("dbtable", "(SELECT *, MOD({}, {}) AS p FROM {}) AS t".format(CENTRAL_ID, partitions, table)). \
        option("user", PG_USER). \
        option("password", PG_PASS). \
        option("driver", "org.postgresql.Driver"). \
        option("partitionColumn", "p"). \
        option("lowerBound", 0). \
        option("upperBound", partitions). \
        option("numPartitions", PG_MAX_CONCURRENT). \
        option("fetchSize", fetch_size). \
        load()
    data = data.repartition(partitions)

    # Cache data
    data.cache()

    # Calculate on data
    out1 = data.withColumn("abstract_length", length("abstract"))
    out2 = data.withColumn("title_length", length("title"))

    # Create a timestamp
    time_stamp = datetime.datetime.utcnow().isoformat()
    save_id = "{}-{}".format(uuid.uuid4(), time_stamp)

    out1.select([CENTRAL_ID, "abstract_length"]).write.mode("overwrite").parquet("{}/{}/abstract".format(args.loc, save_id))
    out2.select([CENTRAL_ID, "title_length"]).write.mode("overwrite").parquet("{}/{}/title".format(args.loc, save_id))

    spark.stop()

    print("run successfull!")

该程序尝试将 Potsgres 数据库的并发查询数限制为 8(请参阅 PG_MAX_CONCURRENT)。这是为了不使数据库过载。加载后,我重新分区到更多(360)个分区,以便将数据分配给所有工作人员。

EMR集群配置如下:

spark-submit论据如下:

spark-submit \
--master yarn \
--conf 'spark.yarn.appMasterEnv.PG_HOST=<<host>>' \
--conf 'spark.yarn.appMasterEnv.PG_PORT=<<port>>' \
--conf 'spark.yarn.appMasterEnv.PG_DB=<<db>>' \
--conf 'spark.yarn.appMasterEnv.PG_USER=<<user>>' \
--conf 'spark.yarn.appMasterEnv.PG_PASS=<<password>>' \
--conf 'spark.yarn.appMasterEnv.PG_MAX_CONCURRENT=8' \
--conf 'spark.executor.cores=3' \
--conf 'spark.executor.instances=30' \
--conf 'spark.executor.memory=12g' \
--conf 'spark.driver.memory=12g' \
--conf 'spark.default.parallelism=360' \
--conf 'spark.kryoserializer.buffer.max=1000M' \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.dynamicAllocation.enabled=false' \
--packages 'com.johnsnowlabs.nlp:spark-nlp_2.11:2.6.5,org.postgresql:postgresql:42.1.1' \
program.py \
--loc s3a://<<bucket>>/

我得到的第一类错误:

Job aborted due to stage failure: Task 0 in stage 0.0 failed 4 times, most recent failure: Lost task 0.4 in stage 0.0 (TID 26, ip-172-31-35-159.eu-central-1.compute.internal, executor 9): ExecutorLostFailure (executor 9 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 140265 ms
Driver stacktrace:

我不确定这意味着什么。可能是从表中获取数据需要太长时间吗?还是另有原因?

我得到的第二种错误是:

    ExecutorLostFailure (executor 7 exited caused by one of the running tasks) Reason: Container from a bad node: container_1609406414316_0002_01_000013 on host: ip-172-31-44-127.eu-central-1.compute.internal. Exit status: 137. Diagnostics: [2020-12-31 10:08:38.093]Container killed on request. Exit code is 137

这似乎表明存在 OOM 问题。但是我不明白为什么会出现 OOM 错误,因为在我看来,我为执行程序和驱动程序分配了足够的内存。此外,当我查看集群的统计信息时,我知道它有足够的内存: 集群统计

会不会是这样的情况,当对 Postgres 服务器使用 8 个并发查询时,它会将 1/8 的数据发送给每个 executor,所以 executor 应该准备好接收总大小的 1/8?或者是否fetchSize限制发送给执行程序的数据大小以避免内存问题?或者也许还有其他原因?我尝试处理的整个表约为 110 GB。

有人可以帮忙吗?提前致谢!

标签: postgresqlapache-sparkjdbcpysparkout-of-memory

解决方案


看来你正在超时。您是否尝试过在 spark 提交参数中增加超时配置?

--conf 'spark.network.timeout=10000000' \

Spark集群充满心跳超时,executors自行退出


推荐阅读