首页 > 解决方案 > Flink docker compose - 自定义库

问题描述

我正在尝试使用 docker-compose 设置 Flink 会话集群。我想在 Flink 中加载一个自定义库,因为这个库包含我所有工作都使用的代码。我通过创建自定义 docker 映像来执行此操作,如下所示:

FROM flink:1.10.0
WORKDIR /opt/flink/lib

RUN mkdir /opt/flink/usrlib
RUN chown flink:flink /opt/flink/usrlib

ADD --chown=flink:flink ./myLibrary.jar /opt/flink/lib/myLibary.jar

作业/任务管理器成功启动。当我使用 Web UI 提交作业时,我的作业可以正常运行,但有一个例外:

在我的库中,我有一个 flink map 运算符(称为 DeserialisationMapper),它使用来自 Kafka 的 JSON 消息并根据消息中的标签创建自定义 Java 对象。例如,如果消息是

{"objectType": "Address", "street": "Street 1"}

我的 DeserialisationMapper 生成一个 Java POJO,类 Address 的实例,其名为“street”的字段设置为“Street 1”。我使用 Java 反射来做到这一点。POJO 的自定义 Java 类仅在作业中可用本身(不是图书馆)。当我在 Eclipse 中执行程序时(我的自定义库作为 Maven 依赖项提供),一切正常。DeserialisationMapper 能够定位作业项目中的自定义 Java 类。当我为作业导出一个“胖”jar 时,它是一个包含作业所有依赖项的 jar(例如,myLibrary.jar)并将其部署到 flink 集群,它也可以正常工作。但是,当我尝试将我的库放入 flink 集群(使用上面显示的自定义图像)并将其从作业 jar 中排除时,我得到一个 ClassNotFoundException 提示无法找到特定类(例如,地址),尽管路径该类看起来正确(例如,org.eclipse.myJob.datatypes.Address) - 我确认该类位于正确位置的作业罐中。注意:

为什么会这样?myLibrary.jar 不应该能够找到包含在我的工作 jar 中的类吗?我应该进行任何特定的配置还是根本不可能?

标签: javadocker-composecluster-computingapache-flinkclasspath

解决方案


自己找到了解决方案。根据这个来自会话集群中用户 jar 的类是动态加载的,因此不能被 Flink 的类路径中加载的库访问。一种解决方案是将用户 jar 放在 lib 文件夹中,这对我不起作用,因为我希望我的用户能够通过 UI 提交他们的作业。本小节介绍了另一种适用于我的解决方案。基本上,当需要来自用户 jar 的类时,您的 Flink 操作员应该使用getRuntimeContext().getUserCodeClassLoader(). 为了做到这一点,它们应该是丰富的函数(例如,RichFlatMapFunction)。然后,使用这个类加载器,你可以调用loadClass(className)指向用户类所在路径的方法。


推荐阅读