xml - 从 HDFS 读取 XML 文件以在 Pyspark 中使用 lxml.etree 进行解析
问题描述
我已经使用 lxml.etree 在 Python 中编写了一个解析器,现在我正在尝试在 Hadoop 集群上运行所述解析器。当我在本地运行该函数时,它按预期工作,但是当我尝试将其应用于集群上的文件时收到以下错误(我在 Pyspark shell 中执行以下命令,python3)
xml_pathname = "hdfs://file_path/date_directory/example_one.xml"
xml_tree = etree.parse(xml_pathname)
OSError: Error reading file '/file_path/date_directory/example_one.xml': failed to load external entity
"/file_path/date_directory/example_one.xml"
hdfs dfs -ls /file_path/date_directory/example_one.xml
当我在终端中运行时,我可以看到该文件。
我希望得到帮助的两个领域 -
- 如何使用 Pyspark 从集群将 XML 文件加载到 lxml.etree.parse() 方法中?
- 我怎样才能最好地扩展它以在 Spark 上有效运行?我想使用我的 Python 解析器解析集群上的数百万个 XML 文件——下面的修改会起作用吗,还是有更好的方法来大规模并行化和运行解析器?通常,我应该如何在我的 spark 配置中设置参数以获得最佳结果(大量执行程序、多个驱动程序等)?
#Same as above but with wildcards to parse millions of XML files
xml_pathname = "hdfs://file_path/*/*.xml"
xml_tree = etree.parse(xml_pathname)
已经为此工作了一段时间,非常感谢任何和所有的帮助。欣赏你们
解决方案
- mapValues() 函数被证明是有用的。Sark 配置的 XML 解析器,例如 Pubmed 解析器,也提供了有用的样板代码,如下所示:
path_rdd = sc.parallelize(path_sample, numSlices=10000) # use only example path
parse_results_rdd = path_rdd.map(lambda x: Row(file_name=os.path.basename(x), **pp.parse_pubmed_xml(x)))
pubmed_oa_df = parse_results_rdd.toDF()
pubmed_oa_df_sel = pubmed_oa_df[['full_title', 'abstract', 'doi',
'file_name', 'pmc', 'pmid',
'publication_year', 'publisher_id',
'journal', 'subjects']]
pubmed_oa_df_sel.write.parquet(os.path.join(save_dir, 'pubmed_oa_%s.parquet' % date_update_str),
mode='overwrite')
https://github.com/titipata/pubmed_parser/blob/master/scripts/pubmed_oa_spark.py
- 使用 fs.globStatus 可以在一个子目录中检索多个 XML 文件。
推荐阅读
- laravel - 使用 rest api 压缩图像
- python - 如何绘制带有字符的元组
- dart - 如何在堆栈中定位小部件,但不匹配父级的宽度/高度?
- amazon-web-services - 带有 AWS 服务模板的 Git Webhooks 不起作用
- c# - 无法删除有帖子的作者
- javascript - 使用javascript生成一个固定长度的随机数,即70
- javascript - 小标签不适用于输入标签
- javascript - 如何在php中上传后显示图像处理同一页面上的AJAX请求
- svg - 在 SED 中将 SVG 路径分离为其可见组件的正确方法是什么?
- ios - 如何在 PJSIP 2.8 for iOS 中启用 WebRTC AEC?