apache-spark - Pyspark 版本 3.x,对于大型 JSON 数据,重新分区无法正常工作
问题描述
我们有一个包含两个节点的 hadoop 集群,大约有 40 个内核和 80 GB RAM。我们必须简单地将大型多行 JSON 消化到 Elastic Search (ES) 集群中。json 的大小为 120 GB,经过 bz2 压缩后,仅减少到 2 GB。我们为数据索引设置了以下代码是 ES
....
def start_job():
warehouse_location = abspath('spark-warehouse')
# Create a spark session
spark = SparkSession \
.builder \
.appName("Python Spark SQL Hive integration example") \
.config("spark.sql.warehouse.dir", warehouse_location) \
.enableHiveSupport() \
.getOrCreate()
# Configurations
spark.conf.set("spark.sql.caseSensitive", "true")
df = spark.read.option("multiline", "true").json(data_path)
df = df.repartition(20)
#Tranformations
df = df.drop("_id")
df.write.format(
'org.elasticsearch.spark.sql'
).option(
'es.nodes', ES_Nodes
).option(
'es.port', ES_PORT
).option(
'es.resource', ES_RESOURCE,
).save()
if __name__ == '__main__':
# ES Setting
ES_Nodes = "hadoop-master"
ES_PORT = 9200
ES_RESOURCE = "myIndex/type"
# Data absolute path
data_path = "/dss_data/mydata.bz2"
start_job()
print("Job has been finished")
问题是只有一个执行器在运行,因为总任务是一个。我期待,应该有 20 个任务,因为我已将数据重新分区为 20。Spark UI 图像如下所示。问题出在哪里。我正在运行以下命令在集群上运行作业
spark-submit --class org.apache.spark.examples.SparkPi --jars elasticsearch-spark-30_2.12-7.14.1.jar --master yarn --deploy-mode cluster --driver-memory 10g --executor-memory 4g --num-executors 20 --executor-cores 2 myscript.py
我们正在使用 Hadoop 和 Spark 版本 3.x。此外,我们还在 Hadoop 日志中获得了以下跟踪信息
df.write.format(
File "/usr/local/leads/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 1107, in save
File "/usr/local/leads/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
File "/usr/local/leads/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 117, in deco
pyspark.sql.utils.AnalysisException: Since Spark 2.3, the queries from raw JSON/CSV files are disallowed when the
referenced columns only include the internal corrupt record column
(named _corrupt_record by default). For example:
spark.read.schema(schema).json(file).filter($"_corrupt_record".isNotNull).count()
and spark.read.schema(schema).json(file).select("_corrupt_record").show().
Instead, you can cache or save the parsed results and then send the same query.
For example, val df = spark.read.schema(schema).json(file).cache() and then
df.filter($"_corrupt_record".isNotNull).count().
解决方案
推荐阅读
- c# - Xamarin.Forms json 文件流读取器 NullException 错误
- reactjs - 以通用方式更新组件
- excel - 我可以在 Countif 公式的标准部分使用变量吗?
- assembly - 在汇编中获取平方数的结果
- android - 子ListView在flutter中完成滚动后如何连续滚动bottomModelSheet?
- go - 无法使用等待组遍历文件夹
- matlab - 指数随机数
- powershell - Powershell过滤属性值而不扩展属性名称
- reactjs - 将 Github 存储库连接到 Heroku 时如何解决此错误
- c - 在 C 中的函数声明中使用宏