java - Flink docker compose - 自定义库
问题描述
我正在尝试使用 docker-compose 设置 Flink 会话集群。我想在 Flink 中加载一个自定义库,因为这个库包含我所有工作都使用的代码。我通过创建自定义 docker 映像来执行此操作,如下所示:
FROM flink:1.10.0
WORKDIR /opt/flink/lib
RUN mkdir /opt/flink/usrlib
RUN chown flink:flink /opt/flink/usrlib
ADD --chown=flink:flink ./myLibrary.jar /opt/flink/lib/myLibary.jar
作业/任务管理器成功启动。当我使用 Web UI 提交作业时,我的作业可以正常运行,但有一个例外:
在我的库中,我有一个 flink map 运算符(称为 DeserialisationMapper),它使用来自 Kafka 的 JSON 消息并根据消息中的标签创建自定义 Java 对象。例如,如果消息是
{"objectType": "Address", "street": "Street 1"}
我的 DeserialisationMapper 生成一个 Java POJO,类 Address 的实例,其名为“street”的字段设置为“Street 1”。我使用 Java 反射来做到这一点。POJO 的自定义 Java 类仅在作业中可用本身(不是图书馆)。当我在 Eclipse 中执行程序时(我的自定义库作为 Maven 依赖项提供),一切正常。DeserialisationMapper 能够定位作业项目中的自定义 Java 类。当我为作业导出一个“胖”jar 时,它是一个包含作业所有依赖项的 jar(例如,myLibrary.jar)并将其部署到 flink 集群,它也可以正常工作。但是,当我尝试将我的库放入 flink 集群(使用上面显示的自定义图像)并将其从作业 jar 中排除时,我得到一个 ClassNotFoundException 提示无法找到特定类(例如,地址),尽管路径该类看起来正确(例如,org.eclipse.myJob.datatypes.Address) - 我确认该类位于正确位置的作业罐中。注意:
为什么会这样?myLibrary.jar 不应该能够找到包含在我的工作 jar 中的类吗?我应该进行任何特定的配置还是根本不可能?
解决方案
自己找到了解决方案。根据这个来自会话集群中用户 jar 的类是动态加载的,因此不能被 Flink 的类路径中加载的库访问。一种解决方案是将用户 jar 放在 lib 文件夹中,这对我不起作用,因为我希望我的用户能够通过 UI 提交他们的作业。本小节介绍了另一种适用于我的解决方案。基本上,当需要来自用户 jar 的类时,您的 Flink 操作员应该使用getRuntimeContext().getUserCodeClassLoader()
. 为了做到这一点,它们应该是丰富的函数(例如,RichFlatMapFunction)。然后,使用这个类加载器,你可以调用loadClass(className)
指向用户类所在路径的方法。
推荐阅读
- r - 如何删除 tm_map 中的“英文”单词?
- angular - 为什么 Angular Material 表 mat-paginator pagesize 不起作用?
- sqlite - SQLite 在 WHERE 子句中使用 OR 条件花费了太多时间
- azure - 运行查询以查找时差
- parsing - 如何编写 SPF 记录的解析器和验证器?
- javascript - 如何通过 web-bluetooth API 向设备发送 hex?
- mysql - 错误代码:1054。“字段列表”中的未知列“部门”0.000 秒
- php - 仅当数字已经千位分隔时才应用 number_format()
- python - 如何使用 model.predict 预测 tf.keras 中的单个图像
- javascript - 居中对齐操作表文本和图标