scala - 在 apache flink 中拆分和加入流
问题描述
我想我有一个相当不标准的用例。filter
我想使用以下函数将我的源流拆分为多个流:
val dataStream:DataStream[MyEvent] = ...
val s1 = dataStream.filter(...).map(...)
val s2 = dataStream.filter(...).map(...)
我还有一个时间戳提取器(传入的事件将在 XML 中附加一个时间戳):
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
...
dataStream.assignTimestampsAndWatermarks(new MyTimestampExtractor)
...
class MyTimestampExtractor extends AssignerWithPunctuatedWatermarks[Elem]
{
override def checkAndGetNextWatermark(lastElement:Elem, extractedTimestamp:Long):Watermark = new Watermark(extractedTimestamp)
override def extractTimestamp(element:Elem, previousElementTimestamp:Long):Long = XmlOperations.getDateTime(element, "@timestamp").getMillis
}
val s = dataStream.filter(...).map(...).filter(...).map(...)
我选择了这种方法,而不是简单地做一个流( ...)
现在,当通过上述示例发送事件时,事件 E1 可能会同时出现在 s1 和 s2 中。这意味着,在我的理解中,相同的事件 E1 作为第一个实例放入 s1 (E1a) 并作为第二个实例放入 s2 (E1b)。
所以我现在要做的就是将 E1a 和 E1b 重新组合成一个类似于 E1 的组合 E1,它同时是 s1 和 s2 的转换。
我试过:
val c1 = s1.join(s2)
.where(_.key).equalTo(_.key)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.apply((e1a, e2b) => { printf("Got e1a and e1b"); e1a })
然而,似乎事件永远不会到达应用函数,我不知道为什么。
我的例子有什么问题?我对像这样的流网络的方法/想法会奏效吗?
解决方案
有没有安排水印?当使用事件时间时,只有当水印到达时才会触发一个窗口,该水印将事件时间时钟提前超过窗口的末尾。您可以使用时间戳提取器/水印生成器来执行此操作;有关更多详细信息,请参阅文档中的示例。
如果其中一个流有时是空闲的,这也会导致问题,因为空闲流上缺少水印会阻碍它连接到的任何流的水印。
根据您要执行的操作,您可能会发现使用 CoProcessFunction 比使用时间窗口连接更容易。看看Flink 培训网站上关于状态丰富和过期状态的练习,以获取示例。
推荐阅读
- docker - docker 中的 java 应用程序如何在 ubuntu 18.4 的挂载点中定位文件
- c++ - C++,程序认为我没有 main 函数,当我这样做并给出“_main”时,引用自:implicit entry/start for main executable error
- ios - Xcode 12.0 中是否有任何更改将 ButtonType 从 roundedRect 更改为 system?
- ruby-on-rails - Rails slim 将 html 渲染为文本
- jquery - 如何在 jQuery 中使用嵌套的 foreach 循环?
- android - 将 TextView 锚定到 ImageView
- jquery - jQuery ajax 间歇性地失败
- python - 用于验证输入的 Python while 循环
- python - 如何禁用 python asyncio fd 存储?(不要对同一个 FD 感到恐慌)
- java - 命令无效调试@stack队列