首页 > 解决方案 > 在 apache flink 中拆分和加入流

问题描述

我想我有一个相当不标准的用例。filter我想使用以下函数将我的源流拆分为多个流:

val dataStream:DataStream[MyEvent] = ...
val s1 = dataStream.filter(...).map(...)
val s2 = dataStream.filter(...).map(...)

我还有一个时间戳提取器(传入的事件将在 XML 中附加一个时间戳):

env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
...

dataStream.assignTimestampsAndWatermarks(new MyTimestampExtractor)
...

class MyTimestampExtractor extends AssignerWithPunctuatedWatermarks[Elem]
{
  override def checkAndGetNextWatermark(lastElement:Elem, extractedTimestamp:Long):Watermark = new Watermark(extractedTimestamp)
  override def extractTimestamp(element:Elem, previousElementTimestamp:Long):Long = XmlOperations.getDateTime(element, "@timestamp").getMillis
}

val s = dataStream.filter(...).map(...).filter(...).map(...)我选择了这种方法,而不是简单地做一个( ...)

现在,当通过上述示例发送事件时,事件 E1 可能会同时出现在 s1 和 s2 中。这意味着,在我的理解中,相同的事件 E1 作为第一个实例放入 s1 (E1a) 并作为第二个实例放入 s2 (E1b)。

所以我现在要做的就是将 E1a 和 E1b 重新组合成一个类似于 E1 的组合 E1,它同时是 s1 和 s2 的转换。

我试过:

val c1 = s1.join(s2)
  .where(_.key).equalTo(_.key)
  .window(TumblingEventTimeWindows.of(Time.seconds(10)))
  .apply((e1a, e2b) => { printf("Got e1a and e1b"); e1a })

然而,似乎事件永远不会到达应用函数,我不知道为什么。

我的例子有什么问题?我对像这样的流网络的方法/想法会奏效吗?

标签: scalastreamapache-flink

解决方案


有没有安排水印?当使用事件时间时,只有当水印到达时才会触发一个窗口,该水印将事件时间时钟提前超过窗口的末尾。您可以使用时间戳提取器/水印生成器来执行此操作;有关更多详细信息,请参阅文档中的示例。

如果其中一个流有时是空闲的,这也会导致问题,因为空闲流上缺少水印会阻碍它连接到的任何流的水印。

根据您要执行的操作,您可能会发现使用 CoProcessFunction 比使用时间窗口连接更容易。看看Flink 培训网站上关于状态丰富过期状态的练习,以获取示例。


推荐阅读