首页 > 解决方案 > 从 HDFS 读取 XML 文件以在 Pyspark 中使用 lxml.etree 进行解析

问题描述

我已经使用 lxml.etree 在 Python 中编写了一个解析器,现在我正在尝试在 Hadoop 集群上运行所述解析器。当我在本地运行该函数时,它按预期工作,但是当我尝试将其应用于集群上的文件时收到以下错误(我在 Pyspark shell 中执行以下命令,python3)

xml_pathname = "hdfs://file_path/date_directory/example_one.xml"
xml_tree = etree.parse(xml_pathname)

OSError: Error reading file '/file_path/date_directory/example_one.xml': failed to load external entity 
"/file_path/date_directory/example_one.xml"

hdfs dfs -ls /file_path/date_directory/example_one.xml当我在终端中运行时,我可以看到该文件。

我希望得到帮助的两个领域 -

  1. 如何使用 Pyspark 从集群将 XML 文件加载到 lxml.etree.parse() 方法中?
  2. 我怎样才能最好地扩展它以在 Spark 上有效运行?我想使用我的 Python 解析器解析集群上的数百万个 XML 文件——下面的修改会起作用吗,还是有更好的方法来大规模并行化和运行解析器?通常,我应该如何在我的 spark 配置中设置参数以获得最佳结果(大量执行程序、多个驱动程序等)?
#Same as above but with wildcards to parse millions of XML files

xml_pathname = "hdfs://file_path/*/*.xml"
xml_tree = etree.parse(xml_pathname)

已经为此工作了一段时间,非常感谢任何和所有的帮助。欣赏你们

标签: xmlapache-sparkhadooppysparklxml

解决方案


  1. mapValues() 函数被证明是有用的。Sark 配置的 XML 解析器,例如 Pubmed 解析器,也提供了有用的样板代码,如下所示:
path_rdd = sc.parallelize(path_sample, numSlices=10000) # use only example path
    parse_results_rdd = path_rdd.map(lambda x: Row(file_name=os.path.basename(x), **pp.parse_pubmed_xml(x)))
    pubmed_oa_df = parse_results_rdd.toDF()
    pubmed_oa_df_sel = pubmed_oa_df[['full_title', 'abstract', 'doi',
                                     'file_name', 'pmc', 'pmid',
                                     'publication_year', 'publisher_id',
                                     'journal', 'subjects']]
    pubmed_oa_df_sel.write.parquet(os.path.join(save_dir, 'pubmed_oa_%s.parquet' % date_update_str),
                                   mode='overwrite')

https://github.com/titipata/pubmed_pa​​rser/blob/master/scripts/pubmed_oa_spark.py

  1. 使用 fs.globStatus 可以在一个子目录中检索多个 XML 文件。

推荐阅读