apache-flink - 如何在 flink 中打印聚合的 DataStream?
问题描述
我有一个自定义状态计算,它会随着我看到来自 Kafka 的新事件而Set<Long>
不断更新。Datastream<Set<Long>>
现在,每次更新我的状态时,我都想将更新后的状态打印到标准输出。想知道如何在 Flink 中做到这一点?对所有窗口和触发器操作有点困惑,我不断收到以下错误。
Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?
我只想知道如何将我的聚合流打印Datastream<Set<Long>>
到标准输出或将其写回另一个 kafka 主题?
下面是引发错误的代码片段。
StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);
DataStream<Set<Long>> stream = bsTableEnv.toAppendStream(kafkaSourceTable, Row.class)
stream
.aggregate(new MyCustomAggregation(100))
.process(new ProcessFunction<Set<Long>, Object>() {
@Override
public void processElement(Set<Long> value, Context ctx, Collector<Object> out) throws Exception {
System.out.println(value.toString());
}
});
解决方案
使用 Flink 保持集合的状态可能非常昂贵,因为在某些情况下,集合会经常被序列化和反序列化。如果可能,最好使用 Flink 内置的 ListState 和 MapState 类型。
这是一个说明一些事情的例子:
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
env.fromElements(1L, 2L, 3L, 4L, 3L, 2L, 1L, 0L)
.keyBy(x -> 1)
.process(new KeyedProcessFunction<Integer, Long, List<Long>> () {
private transient MapState<Long, Boolean> set;
@Override
public void open(Configuration parameters) throws Exception {
set = getRuntimeContext().getMapState(new MapStateDescriptor<>("set", Long.class, Boolean.class));
}
@Override
public void processElement(Long x, Context context, Collector<List<Long>> out) throws Exception {
if (set.contains(x)) {
System.out.println("set contains " + x);
} else {
set.put(x, true);
List<Long> list = new ArrayList<>();
Iterator<Long> iter = set.keys().iterator();
iter.forEachRemaining(list::add);
out.collect(list);
}
}
})
.print();
env.execute();
}
请注意,我想使用键控状态,但事件中没有任何内容可用作键,所以我只是通过常量键控流。这通常不是一个好主意,因为它会阻止处理并行完成——但由于您是作为一个集合进行聚合,所以这不是您可以并行执行的操作,因此不会造成任何伤害。
我将 Longs 集表示为 MapState 对象的键。当我想输出集合时,我将它收集为一个列表。当我只想打印一些东西进行调试时,我只使用 System.out。
当我在 IDE 中运行此作业时,我看到的是:
[1]
[1, 2]
[1, 2, 3]
[1, 2, 3, 4]
set contains 3
set contains 2
set contains 1
[0, 1, 2, 3, 4]
如果您希望每秒查看 MapState 中的内容,您可以在 process 函数中使用 Timer。
推荐阅读
- python - 来自数据文件的二维直方图
- r - 为什么 dput 比原来的精度更高?
- delphi - 如何以编程方式禁用网络连接属性中的特定项目?
- java - JavaCV 如何检查 OpenCVFrameGrabber 是否可用
- lua - Lua - 重复直到重复表除“尝试索引零值”
- python - Python在构造函数中分配字典值
- python - 如何使用 breakpoint() 启动 PyCharm 的调试器
- angular - Angular 9 中移动设备上的签名捕获
- python - 无法让 Heroku 检测 Flask 应用程序中的 json 文件
- python - 用于读取 excel 文档并验证扫描文书工作的信息是否在列表中的 Python 代码,然后将不同文件中的项目分开