java - 如何在嵌入式 Flink Statefun 模块中路由自定义对象?
问题描述
我在 Apache Flink Statefun 3.0(自定义 Greeter 示例)中有一个嵌入式模块,它使用 JSON 序列化事件。当尝试route()
从入口反序列化消息时,我收到一个异常,即我的自定义类型不能转换为 protobuf(是的,它不是)——但应该这样吗?我的意思是我尝试使用 3.x 文档,但没有发现关于要路由的类型的任何限制。
对此有任何提示或指示吗?
谢谢
// The custom type (Bean-style and all)
public final class Message {
@JsonProperty private String name;
@JsonProperty private String id;
@JsonProperty private int visits;
public Message() {}
public String getName() { return name; }
public void setName(String s) { name = s; }
public String getId() { return id; }
public void setId(String s) { id = s; }
public int getVisits() { return visits; }
public void setVisits(int i) { visits = i; }
}
// The function
public class GreeterFn implements StatefulFunction {
public static final FunctionType TYPE = new FunctionType("example", "greeter");
@Override
public void invoke(Context ctx, Object msg) {
// I never get here
}
}
// The module
public class EmbeddedModule implements StatefulFunctionModule {
static final IngressIdentifier<Message> INGRESS = new IngressIdentifier<>(Message.class, "example", "names");
private static final class MsgDeser implements KafkaIngressDeserializer<Message> {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public Message deserialize(ConsumerRecord<byte[], byte[]> input) {
try { return mapper.readValue(new String(input.value(), StandardCharsets.UTF_8), Message.class); }
catch (java.io.IOException e) { e.printStackTrace(); }
return null;
}
}
public void configure(Map<String, String> globalConfiguration, Binder binder) {
binder.bindIngress(KafkaIngressBuilder.forIdentifier(INGRESS)
.withKafkaAddress("kafka:9092")
.withTopic("names")
.withDeserializer(MsgDeser.class)
.withConsumerGroupId("my-group-id")
.build());
binder.bindIngressRouter(INGRESS, new Router<Message>() {
@Override
public void route(Message m, Downstream<Message> ds) {
ds.forward(GreeterFn.TYPE, m.getName(), m); // <-- I get here OK but then the exception
}
});
binder.bindFunctionProvider(GreeterFn.TYPE, x -> new GreeterFn());
}
}
// And the logs (trimmed)
statefun-worker_1 | 2021-07-12 11:29:33,366 WARN org.apache.flink.runtime.taskmanager.Task [] - Source: example-names-ingress -> router (names) (1/1)#0 (2b43e45ce4bcc61340ff131d147f3afe) switched from RUNNING to FAILED.
statefun-worker_1 | java.lang.RuntimeException: class com.my.flink.Message cannot be cast to class com.google.protobuf.Message (com.my.flink.Message is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @2aab3c1e; com.google.protobuf.Message is in unnamed module of loader 'app')
statefun-worker_1 | at org.apache.flink.streaming.runtime.io.RecordWriterOutput.pushToRecordWriter(RecordWriterOutput.java:103) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
statefun-worker_1 | at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:87) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
statefun-worker_1 | at org.apache.flink.streaming.runtime.io.RecordWriterOutput.collect(RecordWriterOutput.java:43) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
statefun-worker_1 | at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:50) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
statefun-worker_1 | at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:28) ~[flink-dist_2.12-1.12.1.jar:1.12.1]
statefun-worker_1 | at org.apache.flink.statefun.flink.core.translation.IngressRouterOperator$DownstreamCollector.forward(IngressRouterOperator.java:127) ~[statefun-flink-core.jar:3.0.0]
statefun-worker_1 | at org.apache.flink.statefun.sdk.io.Router$Downstream.forward(Router.java:67) ~[statefun-flink-distribution.jar:3.0.0]
statefun-worker_1 | at com.my.flink.EmbeddedModule$1.route(EmbeddedModule.java:47) ~[myflink-1.jar:?]
statefun-worker_1 | at com.my.flink.EmbeddedModule$1.route(EmbeddedModule.java:43) ~[myflink-1.jar:?]
statefun-worker_1 | at org.apache.flink.statefun.flink.core.translation.IngressRouterOperator.processElement(IngressRouterOperator.java:81) ~[statefun-flink-core.jar:3.0.0]
...
statefun-worker_1 | Caused by: java.lang.ClassCastException: class com.my.flink.Message cannot be cast to class com.google.protobuf.Message (com.my.flink.Message is in unnamed module of loader org.apache.flink.util.ChildFirstClassLoader @2aab3c1e; com.google.protobuf.Message is in unnamed module of loader 'app')
statefun-worker_1 | at org.apache.flink.statefun.flink.core.message.MessagePayloadSerializerPb.serialize(MessagePayloadSerializerPb.java:50) ~[statefun-flink-core.jar:3.0.0]
...
解决方案
默认情况下,嵌入式函数发送/接收的消息被假定为 Protobuf。您可以通过在 flink-conf.yaml 中设置以下键来使用 Kryo(甚至自定义序列化程序):
statefun.message.serializer: WITH_KRYO_PAYLOADS
这不是真正推荐的,因为随着时间的推移很难发展您的应用程序。
你仍然可以坚持使用 Protobuf,通过使用名为 StringValue 的内置 Protobuf 类型来推迟 String -> Message 反序列化。
我采用了您粘贴的代码来使用 StringValue:
public class EmbeddedModule implements StatefulFunctionModule {
static final IngressIdentifier<StringValue> INGRESS = new IngressIdentifier<>(StringValue.class, "example", "names");
private static final class MsgDeser implements KafkaIngressDeserializer<StringValue> {
private final ObjectMapper mapper = new ObjectMapper();
@Override
public StringValue deserialize(ConsumerRecord<byte[], byte[]> input) {
String json = new String(input.value(), StandardCharsets.UTF_8);
return StringValue.of(json);
}
}
public void configure(Map<String, String> globalConfiguration, Binder binder) {
binder.bindIngress(KafkaIngressBuilder.forIdentifier(INGRESS)
.withKafkaAddress("kafka:9092")
.withTopic("names")
.withDeserializer(MsgDeser.class)
.withConsumerGroupId("my-group-id")
.build());
binder.bindIngressRouter(INGRESS, new Router<StringValue>() {
@Override
public void route(StringValue m, Downstream<StringValue> ds) {
String json = StringValue.getValue();
String name = ... ; // extract the name from this JSON
ds.forward(GreeterFn.TYPE, name , m);
}
});
binder.bindFunctionProvider(GreeterFn.TYPE, x -> new GreeterFn());
}
}
将您的消息定义为 Portobuf 消息
为了避免双重反序列化(在路由器和函数处),您可以定义以下 Protobuf 消息:
message MyMessage {
string name = 1;
string id = 2;
int visits = 3;
}
然后将 json 字符串转换为 MyMessage 的实例:
MyMessage.Builder builder = MyMessage.newBuilder();
JsonFormat.parser().merge(jsonString, builder);
MyMessage myMessage = builder.build();
推荐阅读
- python - 允许来自我的 zappa 应用程序的“ObjectCreated”事件通知
- javascript - 如何重写此 JavaScript 行以便于阅读?(三元运算符)
- ios - 当尝试从统一构建一个 Xcode 项目到我的 iPhone 时,它给了我这个错误 Apple Mach-O Linker (Id) Error
- oracle - 是否可以创建触发器以将表的数据插入到 Oracle 中的另一个表中?
- php - PHP/HTML 中的语法错误 - 解析错误
- python - Python:多处理输出问题
- c++ - 为什么有时需要将 c++ 模板函数定义放在头文件中?
- mysql - mysql 在另一个流行值查询的结果中找到一个流行值
- sql - 过滤掉 SQL 查询中的 NULL 值
- javascript - 如何在面板中调用其他 JS 文件中的函数和变量 - Chrome 扩展