java - 如何向 flink CEP 数据流添加新事件?
问题描述
我正在使用 flink 1.5.2 来解决 CEP 问题。
我的数据来自一个列表,其他一些进程会在系统运行时将新的事件对象添加到该列表中。它不是套接字或网络消息。我一直在阅读官方网站示例。这是我想我应该做的步骤。
- 使用 env.fromCollection(list) 创建一个数据流;
- 定义一个 Pattern 模式
- 使用 CEP.pattern(data_stream, pattern) 获取 PatternStream
- 使用 pattern_stream.select( ...implement select interface ...) 将复杂事件结果作为 DataStream
但是我的输入流应该是无界的。我在 DataStream<> 对象中没有找到任何 add() 方法。我该如何做到这一点?而且,我是否需要告诉 DataStream<> 何时清理过时的事件?
解决方案
当使用预先固定的有界输入集时,集合仅适合作为 Flink 的输入源,例如在编写测试或只是进行试验时。如果你想要一个无界的流,你需要选择一个不同的源,比如一个套接字或像 Kafka 这样的消息队列系统。
套接字很容易用于实验。在 Linux 和 MacOS 系统上,您可以使用
nc -lk 9999
创建一个 Flink 可以绑定到端口 9999 的套接字,并且您提供的任何内容作为nc
(netcat) 的输入将一次一行地流式传输到您的 Flink 作业中。Netcat 也可用于 Windows,但未预安装。
但是,您不应该计划在生产中使用套接字,因为它们无法重绕(这对于在故障恢复期间使用 Flink 获得准确结果至关重要)。
推荐阅读
- r - 在 ggplot2 中显示自定义轴标签
- javascript - “string”参数必须是 string、Buffer 或 ArrayBuffer 类型之一。接收类型未定义:BASIC AUTH
- javascript - 分配 javascript 函数的返回值显示为 [object Object 而不是 HTML?
- java - UDP客户端问题
- c - Glob 中的“未排序”订单从何而来?
- php - 向 FCM API 发送请求时收到无效的 JSON 有效负载
- android - 带有通知和嵌套多个图的 Jetpack 导航图问题
- python - 如果正则表达式不匹配,则在 pandas str.extract() 之后保留原始字符串值
- laravel - 我无法访问附加到数据透视表的数组?
- python - 在 1000 个类别的 imagenet 数据集上重新训练 Vgg16 keras 模型,从而降低预训练权重的准确性