apache-spark - 在 EMR 上的 PySpark 中运行自定义 Java 类
问题描述
我正在尝试利用 Cerner Bunsen 包在 AWS EMR 上的 PySpark 中进行 FHIR 处理,特别是 Bundles 类及其方法。我正在使用 Apache Livy API 创建 spark 会话,
def create_spark_session(master_dns, kind, jars):
# 8998 is the port on which the Livy server runs
host = 'http://' + master_dns + ':8998'
data = {'kind': kind, 'jars': jars}
headers = {'Content-Type': 'application/json'}
response = requests.post(host + '/sessions', data=json.dumps(data), headers=headers)
logging.info(response.json())
return response.headers
kind = pyspark3 和 jars 是存放 jar 的 S3 位置 (bunsen-shaded-1.4.7.jar)
数据转换正在尝试导入 jar 并通过以下方式调用方法:
# Setting the Spark Session and Pulling the Existing SparkContext
sc = SparkContext.getOrCreate()
# Cerner Bunsen
from py4j.java_gateway import java_import, JavaGateway
java_import(sc._gateway.jvm,"com.cerner.bunsen.Bundles")
func = sc._gateway.jvm.Bundles()
我收到的错误是
“py4j.protocol.Py4JError:调用 None.com.cerner.bunsen.Bundles 时出错。跟踪:\npy4j.Py4JException:构造函数 com.cerner.bunsen.Bundles([]) 不存在”
这是我第一次尝试使用 java_import 所以任何帮助将不胜感激。
编辑:我稍微改变了转换脚本,现在看到了一个不同的错误。我可以看到 jar 被添加到日志中,所以我确定它在那里并且 jars: jars 功能正在按预期工作。新的转变是:
# Setting the Spark Session and Pulling the Existing SparkContext
sc = SparkContext.getOrCreate()
# Manage logging
#sc.setLogLevel("INFO")
# Cerner Bunsen
from py4j.java_gateway import java_import, JavaGateway
java_import(sc._gateway.jvm,"com.cerner.bunsen")
func_main = sc._gateway.jvm.Bundles
func_deep = sc._gateway.jvm.Bundles.BundleContainer
fhir_data_frame = func_deep.loadFromDirectory(spark,"s3://<bucket>/source_database/Patient",1)
fhir_data_frame_fromJson = func_deep.fromJson(fhir_data_frame)
fhir_data_frame_clean = func_main.extract_entry(spark,fhir_data_frame_fromJson,'patient')
fhir_data_frame_clean.show(20, False)
新的错误是:
“JavaPackage”对象不可调用
搜索这个错误有点徒劳,但同样,如果有人有想法,我会很乐意接受。
解决方案
如果你想在 Pyspark 中使用 Scala/Java 函数,你还必须在类路径中添加 jar 包。您可以通过 2 种不同的方式进行操作:
选项1:
在 Spark 中使用标志提交--jars
spark-submit example.py --jars /path/to/bunsen-shaded-1.4.7.jar
选项2:将其添加spark-defaults.conf
到属性中的文件中:
在中添加以下代码: path/to/spark/conf/spark-defaults.conf
# Comma-separated list of jars include on the driver and executor classpaths.
spark.jars /path/to/bunsen-shaded-1.4.7.jar
推荐阅读
- pandas - 将日期标题后跟 AM 和 PM 时间单元格转换为整个时间戳列
- python - 转换为时间戳并返回更改原始时间 - Python Pandas
- arrays - 如何在不遍历整个事物的情况下迭代稀疏数组
- java - 当同一个类中有一个空的构造函数时,是否会绕过带有 args 的构造函数?
- python - 如何使用具有多行平均值的 fillna()?
- ios - 在 Xcode 12.4 中更新领域对象
- python - 谷歌 colab 连接到 IB
- java - 无法在 Kubernetes 上的 Eureka 注册表上注册微服务
- javascript - 路由器功能 - 'beforeEnter' 从 vuex 获取默认值
- android - Android MongoDB Realm:跳过了 28 帧!应用程序可能在其主线程上做了太多工作