首页 > 解决方案 > 如何在 flink 中打印聚合的 DataStream?

问题描述

我有一个自定义状态计算,它会随着我看到来自 Kafka 的新事件而Set<Long>不断更新。Datastream<Set<Long>>现在,每次更新我的状态时,我都想将更新后的状态打印到标准输出。想知道如何在 Flink 中做到这一点?对所有窗口和触发器操作有点困惑,我不断收到以下错误。

Caused by: java.lang.RuntimeException: Record has Long.MIN_VALUE timestamp (= no timestamp marker). Is the time characteristic set to 'ProcessingTime', or did you forget to call 'DataStream.assignTimestampsAndWatermarks(...)'?

我只想知道如何将我的聚合流打印Datastream<Set<Long>>到标准输出或将其写回另一个 kafka 主题?

下面是引发错误的代码片段。

StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(env, bsSettings);

DataStream<Set<Long>> stream = bsTableEnv.toAppendStream(kafkaSourceTable, Row.class)
   stream
      .aggregate(new MyCustomAggregation(100))
      .process(new ProcessFunction<Set<Long>, Object>() {
       @Override
         public void processElement(Set<Long> value, Context ctx, Collector<Object> out) throws Exception {
           System.out.println(value.toString());
         }
       });

标签: apache-flink

解决方案


使用 Flink 保持集合的状态可能非常昂贵,因为在某些情况下,集合会经常被序列化和反序列化。如果可能,最好使用 Flink 内置的 ListState 和 MapState 类型。

这是一个说明一些事情的例子:

public static void main(String[] args) throws Exception {

    StreamExecutionEnvironment env =
        StreamExecutionEnvironment.getExecutionEnvironment();

    env.fromElements(1L, 2L, 3L, 4L, 3L, 2L, 1L, 0L)
        .keyBy(x -> 1)
        .process(new KeyedProcessFunction<Integer, Long, List<Long>> () {
            private transient MapState<Long, Boolean> set;

            @Override
            public void open(Configuration parameters) throws Exception {
                set = getRuntimeContext().getMapState(new MapStateDescriptor<>("set", Long.class, Boolean.class));
            }

            @Override
            public void processElement(Long x, Context context, Collector<List<Long>> out) throws Exception {
                if (set.contains(x)) {
                    System.out.println("set contains " + x);
                } else {
                    set.put(x, true);
                    List<Long> list = new ArrayList<>();
                    Iterator<Long> iter = set.keys().iterator();
                    iter.forEachRemaining(list::add);
                    out.collect(list);
                }
            }
        })
        .print();

    env.execute();

}

请注意,我想使用键控状态,但事件中没有任何内容可用作键,所以我只是通过常量键控流。这通常不是一个好主意,因为它会阻止处理并行完成——但由于您是作为一个集合进行聚合,所以这不是您可以并行执行的操作,因此不会造成任何伤害。

我将 Longs 集表示为 MapState 对象的键。当我想输出集合时,我将它收集为一个列表。当我只想打印一些东西进行调试时,我只使用 System.out。

当我在 IDE 中运行此作业时,我看到的是:

[1]
[1, 2]
[1, 2, 3]
[1, 2, 3, 4]
set contains 3
set contains 2
set contains 1
[0, 1, 2, 3, 4]

如果您希望每秒查看 MapState 中的内容,您可以在 process 函数中使用 Timer。


推荐阅读