首页 > 解决方案 > Pyspark 版本 3.x,对于大型 JSON 数据,重新分区无法正常工作

问题描述

我们有一个包含两个节点的 hadoop 集群,大约有 40 个内核和 80 GB RAM。我们必须简单地将大型多行 JSON 消化到 Elastic Search (ES) 集群中。json 的大小为 120 GB,经过 bz2 压缩后,仅减少到 2 GB。我们为数据索引设置了以下代码是 ES

....

def start_job():

    warehouse_location = abspath('spark-warehouse')
    # Create a spark session
    spark = SparkSession \
        .builder \
        .appName("Python Spark SQL Hive integration example") \
        .config("spark.sql.warehouse.dir", warehouse_location) \
        .enableHiveSupport() \
        .getOrCreate()

    # Configurations
    spark.conf.set("spark.sql.caseSensitive", "true")

    df = spark.read.option("multiline", "true").json(data_path)
    df = df.repartition(20)
    #Tranformations
    df = df.drop("_id")

    df.write.format(
        'org.elasticsearch.spark.sql'
    ).option(
        'es.nodes', ES_Nodes
    ).option(
        'es.port', ES_PORT
    ).option(
        'es.resource', ES_RESOURCE,
    ).save()

if __name__ == '__main__':

    # ES Setting
    ES_Nodes = "hadoop-master"
    ES_PORT = 9200
    ES_RESOURCE = "myIndex/type"

    # Data absolute path
    data_path = "/dss_data/mydata.bz2"

    start_job()
    print("Job has been finished")

问题是只有一个执行器在运行,因为总任务是一个。我期待,应该有 20 个任务,因为我已将数据重新分区为 20。Spark UI 图像如下所示。问题出在哪里。我正在运行以下命令在集群上运行作业

spark-submit --class org.apache.spark.examples.SparkPi --jars elasticsearch-spark-30_2.12-7.14.1.jar    --master yarn  --deploy-mode cluster --driver-memory 10g     --executor-memory 4g --num-executors 20 --executor-cores 2 myscript.py

我们正在使用 Hadoop 和 Spark 版本 3.x。此外,我们还在 Hadoop 日志中获得了以下跟踪信息

 df.write.format(
  File "/usr/local/leads/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1107, in save
  File "/usr/local/leads/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/usr/local/leads/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
pyspark.sql.utils.AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).json(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).json(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().

在此处输入图像描述

标签: apache-sparkhadooppyspark

解决方案


推荐阅读