apache-flink - 通过从 kafka 读取详细信息创建动态 flink 窗口
问题描述
假设 Kafka 消息包含 flink 窗口大小配置。
我想从 Kafka 读取消息并在 flink 中创建一个全局窗口。
问题陈述:
我们可以使用 BroadcastStream 来处理上述情况吗?
或者
还有其他支持上述情况的方法吗?
解决方案
Flink 的窗口 API 不支持动态改变窗口大小。
您需要做的是使用进程函数实现自己的窗口化。在本例中为 KeyedBroadcastProcessFunction,其中广播窗口配置。
您可以查看Flink 培训,了解如何使用 KeyedProcessFunction 实现时间窗口(复制如下):
public class PseudoWindow extends KeyedProcessFunction<String, KeyedDataPoint<Double>, KeyedDataPoint<Integer>> {
// Keyed, managed state, with an entry for each window.
// There is a separate MapState object for each sensor.
private MapState<Long, Integer> countInWindow;
boolean eventTimeProcessing;
int durationMsec;
/**
* Create the KeyedProcessFunction.
* @param eventTime whether or not to use event time processing
* @param durationMsec window length
*/
public PseudoWindow(boolean eventTime, int durationMsec) {
this.eventTimeProcessing = eventTime;
this.durationMsec = durationMsec;
}
@Override
public void open(Configuration config) {
MapStateDescriptor<Long, Integer> countDesc =
new MapStateDescriptor<>("countInWindow", Long.class, Integer.class);
countInWindow = getRuntimeContext().getMapState(countDesc);
}
@Override
public void processElement(
KeyedDataPoint<Double> dataPoint,
Context ctx,
Collector<KeyedDataPoint<Integer>> out) throws Exception {
long endOfWindow = setTimer(dataPoint, ctx.timerService());
Integer count = countInWindow.get(endOfWindow);
if (count == null) {
count = 0;
}
count += 1;
countInWindow.put(endOfWindow, count);
}
public long setTimer(KeyedDataPoint<Double> dataPoint, TimerService timerService) {
long time;
if (eventTimeProcessing) {
time = dataPoint.getTimeStampMs();
} else {
time = System.currentTimeMillis();
}
long endOfWindow = (time - (time % durationMsec) + durationMsec - 1);
if (eventTimeProcessing) {
timerService.registerEventTimeTimer(endOfWindow);
} else {
timerService.registerProcessingTimeTimer(endOfWindow);
}
return endOfWindow;
}
@Override
public void onTimer(long timestamp, OnTimerContext context, Collector<KeyedDataPoint<Integer>> out) throws Exception {
// Get the timestamp for this timer and use it to look up the count for that window
long ts = context.timestamp();
KeyedDataPoint<Integer> result = new KeyedDataPoint<>(context.getCurrentKey(), ts, countInWindow.get(ts));
out.collect(result);
countInWindow.remove(timestamp);
}
}
推荐阅读
- tensorflow - 如何组合具有不同时间步长的两个时间序列数据集?
- python - 用另一个数组中的值替换 numpy 数组中的所有 -1
- node.js - 您在哪里以及如何导入节点模块?
- ios - Swift - UIButton 以编程方式设置约束
- ssl - 使用 *.herokuapp.com 域的缺点?
- java - 是否有可能有 3 个相互更改的 EditText 可以在所有 3 个之间互换的 EditText?
- ubuntu - 如何在 Ubuntu 上为 2 个不同的视频驱动程序设置 2 个启动配置?
- javascript - 西班牙语使用者的日期格式 - 返回 NaN 或无效日期
- mysql - 为什么我不能将数据导入 MySQL 中的模式?
- java - Spring MVC 应用程序,如何使用依赖 jar 文件中的 jsp 视图配置 servlet