apache-flink - 如何在 flink 中更新 KeyedBroadcastProcessFunction 中的广播状态?
问题描述
我是 Flink 的新手,我正在使用 apache flink 进行模式匹配,其中模式列表以广播状态存在,并遍历 processElements 函数中的模式以找到匹配的模式,我正在从数据库及其上读取此模式时间活动。下面是我的代码
MapState Descriptor 和 Side 输出流如下
public static final MapStateDescriptor<String, String> ruleDescriptor=
new MapStateDescriptor<String, String>("RuleSet", BasicTypeInfo.STRING_TYPE_INFO,
BasicTypeInfo.STRING_TYPE_INFO);
public final static OutputTag<Tuple2<String, String>> unMatchedSideOutput =
new OutputTag<Tuple2<String, String>>(
"unmatched-side-output") {
};
处理函数和广播函数如下:
@Override
public void processElement(Tuple2<String, String> inputValue, ReadOnlyContext ctx,Collector<Tuple2<String,String>> out) throws Exception {
for (Map.Entry<String, String> ruleSet: ctx.getBroadcastState(broadcast.patternRuleDescriptor).immutableEntries()) {
String ruleName = ruleSet.getKey();
//If the rule in ruleset is matched then send output to main stream and break the program
if (this.rule) {
out.collect(new Tuple2<>(inputValue.f0, inputValue.f1));
break;
}
}
// Writing output to sideout if no rule is matched
ctx.output(Output.unMatchedSideOutput, new Tuple2<>("No Rule Detected", inputValue.f1));
}
@Override
public void processBroadcastElement(Tuple2<String, String> ruleSetConditions, Context ctx, Collector<Tuple2<String,String>> out) throws Exception { ctx.getBroadcastState(broadcast.ruleDescriptor).put(ruleSetConditions.f0,
ruleSetConditions.f1);
}
主要功能如下
public static void main(String[] args) throws Exception {
//Initiate a datastream environment
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//Reads incoming data for upstream
DataStream<String> incomingSignal =
env.readTextFile(....);
//Reads the patterns available in configuration file
DataStream<String> ruleStream =
env.readTextFile();
//Generate a key,value pair of set of patterns where key is pattern name and value is pattern condition
DataStream<Tuple2<String, String>> ruleStream =
rawPatternStream.flatMap(new FlatMapFunction<String, Tuple2<String, String>>() {
@Override
public void flatMap(String ruleCondition, Collector<Tuple2<String, String>> out) throws Exception {
String rules[] = ruleCondition.split[","];
out.collect(new Tuple2<>(rules[0], rules[1]));
}
}
});
//Broadcast the patterns to all the flink operators which will be stored in flink operator memory
BroadcastStream<Tuple2<String, String>>ruleBroadcast = ruleStream.broadcast(ruleDescriptor);
/*Creating keystream based on sourceName as key */
DataStream<Tuple2<String, String>> matchSignal =
incomingSignal.map(new MapFunction<String, Tuple2<String, String>>() {
@Override
public Tuple2<String, String> map(String incomingSignal) throws Exception {
String sourceName = ingressSignal.split[","][0]
return new Tuple2<>(sourceName, incomingSignal);
}
}).keyBy(0).connect(ruleBroadcast).process(new KeyedBroadCastProcessFunction());
matchSignal.print("RuleDetected=>");
}
我有一些问题
1)目前我正在从数据库中读取规则,当 flink 作业在集群中运行时如何更新广播状态,如果我从 kafka 主题中获取新规则集,如何在 KeyedBroadcasrProcessFunction 2 的 processBroadcast 方法中更新广播状态)当广播状态更新时,我们是否需要重新启动 flink 作业?
请帮我解决以上问题
解决方案
设置或更新广播状态的唯一方法是使用processBroadcastElement
aBroadcastProcessFunction
或的方法KeyedBroadcastProcessFunction
。您需要做的就是调整您的应用程序以从流式源中流式传输规则,而不是从文件中读取一次。
广播状态是一个哈希映射。如果您的广播流包含一个新的键/值对,它使用与早期广播事件相同的键,那么新值将替换旧值。否则你会得到一个全新的条目。
如果将readFile与 一起使用FileProcessingMode.PROCESS_CONTINUOUSLY
,则每次修改文件时,都会重新摄取其全部内容。您可以使用该机制来更新您的规则集。
推荐阅读
- python - 找到两个列表的所有组合
- machine-learning - 如何获取火炬中心模型生成的翻译的对齐或注意信息?
- python - 在 Python 中从 3D 图像中提取补丁
- c# - 如何解决在 Visual Studio 中未执行测试的以下问题
- python - 将带括号的字符串解析为参数
- google-custom-search - Google Web Detection 搜索本地结果
- html - 删除跨度内的文本
- vb.net - 数据库插入仅适用于整数,不适用于字符串
- docker - 如何在 Heroku 上部署 dockerized 应用程序?
- python - 如果 for 循环将列表 [0,0,7] 添加到“codelist”,我正在尝试返回 True,但即使我的列表中没有这些数字,我也会得到 True