首页 > 解决方案 > 如何向 flink CEP 数据流添加新事件?

问题描述

我正在使用 flink 1.5.2 来解决 CEP 问题。

我的数据来自一个列表,其他一些进程会在系统运行时将新的事件对象添加到该列表中。它不是套接字或网络消息。我一直在阅读官方网站示例。这是我想我应该做的步骤。

  1. 使用 env.fromCollection(list) 创建一个数据流;
  2. 定义一个 Pattern 模式
  3. 使用 CEP.pattern(data_stream, pattern) 获取 PatternStream
  4. 使用 pattern_stream.select( ...implement select interface ...) 将复杂事件结果作为 DataStream

但是我的输入流应该是无界的。我在 DataStream<> 对象中没有找到任何 add() 方法。我该如何做到这一点?而且,我是否需要告诉 DataStream<> 何时清理过时的事件?

标签: javaapache-flinkflink-streamingcomplex-event-processingflink-cep

解决方案


当使用预先固定的有界输入集时,集合仅适合作为 Flink 的输入源,例如在编写测试或只是进行试验时。如果你想要一个无界的流,你需要选择一个不同的源,比如一个套接字或像 Kafka 这样的消息队列系统。

套接字很容易用于实验。在 Linux 和 MacOS 系统上,您可以使用

nc -lk 9999

创建一个 Flink 可以绑定到端口 9999 的套接字,并且您提供的任何内容作为nc(netcat) 的输入将一次一行地流式传输到您的 Flink 作业中。Netcat 也可用于 Windows,但未预安装。

但是,您不应该计划在生产中使用套接字,因为它们无法重绕(这对于在故障恢复期间使用 Flink 获得准确结果至关重要)。


推荐阅读