java - 在 Java Flink 作业中使用 Python 用户定义函数
问题描述
无论如何要在Java Flink作业中使用python用户定义的函数,或者无论如何都要传达例如由flink与java完成的转换结果与python用户定义的函数来应用一些机器学习的东西:
我知道从 pyFlink 你可以做这样的事情:
table_env.register_java_function("hash_code", "my.java.function.HashCode")
但是我需要做类似的事情,但是从 java 添加 python 函数,或者如何将 java 转换的结果直接传递给 Python UDF Flink 作业?
我希望这些问题不要太疯狂,但我需要知道是否存在以某种方式将 Flink DataStream API 与以 Java 作为主要语言的 Python Table API 进行通信?这意味着我需要从 Java 执行:Source -> Transformations -> Sink,但是其中一些转换可以触发 Python 函数,或者 Python 函数将等待一些 Java 转换完成以对 Stream 结果执行某些操作。
我希望有人明白我在这里想要做什么。
亲切的问候!
解决方案
此集成示例:您的 pom.xml 中需要此依赖项,假设 Flink 1.11 是当前版本。
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.11</artifactId>
<version>1.11.2</version>
<scope>provided</scope>
</dependency>
创建环境:
private StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
private StreamTableEnvironment tableEnv = getTableAPIEnv(env);
/*this SingleOutputStreamOperator will contains the result of the consumption from the defined source*/
private SingleOutputStreamOperator<Event> stream;
public static StreamTableEnvironment getTableAPIEnv(StreamExecutionEnvironment env) {
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
tableEnv.getConfig().getConfiguration().setString("python.files", path/function.py);
tableEnv.getConfig().getConfiguration().setString("python.client.executable", path/python);
tableEnv.getConfig().getConfiguration().setString("python.executable", path/python);
tableEnv.getConfig().getConfiguration().setString("taskmanager.memory.task.off-heap.size", "79mb");
/*pass here the function.py and the name of the function into the python script*/
tableEnv.executeSql("CREATE TEMPORARY SYSTEM FUNCTION FunctionName AS 'function.FunctionName' LANGUAGE PYTHON");
return tableEnv;
}
从您想要执行的转换开始,例如:
SingleOutputStreamOperator<EventProfile> profiles = createUserProfile(stream.keyBy(k -> k.id));
/*The result of that ProcessFunction `createUserProfile()` will be sent into the Python function to update some values of the profile and return them back into a defined function in Flink with Java: map function for example*/
profiles = turnIntoTable(profiles).map((MapFunction<Row, EventProfile>) x -> {
/*you custom code here to do the mapping*/
});
profiles.addSink(new yourCustomSinkFunction());
/*this function will process the Event and create the EventProfile class for this example but you can also use another operators (map, flatMap, etc)*/
private SingleOutputStreamOperator<EventProfile> createUserProfile(KeyedStream<Event, String> stream) {
return stream.process(new UserProfileProcessFunction());
}
/*This function will receive a SingleOutputStreamOperator and sent each record to the Python function trough the TableAPI and returns a Row of String(you can change the Row type) that will be mapped back into EventProfile class*/
@FunctionHint(output = @DataTypeHint("ROW<a STRING>"))
private DataStream<Row> turnIntoTable(SingleOutputStreamOperator<EventProfile> rowInput) {
Table events = tableEnv.fromDataStream(rowInput,
$("id"), $("noOfHits"), $("timestamp"))
.select("FunctionName(id, noOfHits, timestamp)");
return tableEnv.toAppendStream(events, Row.class);
}
最后
env.execute("Job Name");
脚本中调用FunctionName
的python 函数示例:function.py
@udf(
input_types=[
DataTypes.STRING(), DataTypes.INT(), DataTypes.TIMESTAMP(precision=3)
],
result_type=DataTypes.STRING()
)
def FunctionName(id, noOfHits, timestamp):
# function code here
return f"{id}|{noOfHits}|{timestamp}"
推荐阅读
- php - 如何计算 PHPMyadmin DB 表中具有特定值(lke '1')的列数?
- python-3.x - python瓶重定向导致错误405:方法不允许
- r - 重新编码枢轴宽表
- reactjs - 如何在 React 中将函数从父级传递给子级
- amazon-web-services - AWS RDS 表访问
- javascript - d3.js:低值堆积条形图
- python - 使用 python 从 csv 打开链接,出现 URL 无效字符问题
- r - 如何使用 x 轴上的所有日期绘制图表?
- spring-mvc - WebTestClient 无法将正文发布到 MVC 控制器
- reactjs - 反应原生网络,__DEV__ 未定义