首页 > 解决方案 > ConnectedStreams 中 2 个流之一的饥饿

问题描述

背景

我们有 2 个流,我们称它们为AB。它们分别产生元素ab

StreamA以较慢的速度(每分钟一个)生成元素。

StreamB每 2 周接收一个元素。它使用一个flatMap函数接收该元素并b在循环中生成约 200 万个元素:

(爪哇)

for (BElement value : valuesList) {
    out.collect(updatedTileMapVersion);
}

这里valueList包含约 200 万个b元素

我们使用, 通过某个键连接这些流(AB) ,并在连接的流上执行另一个:connectflatMap

streamA.connect(streamB).keyBy(AClass::someKey, BClass::someKey).flatMap(processConnectedStreams)

每个b元素都有一个不同的键,这意味着大约有 200 万个键来自B流。

问题

我们看到的是饥饿。即使有a准备处理的元素,它们也不会在processConnectedStreams.

我们试图解决这个问题

B我们尝试通过每 10 个元素执行一次,在 1 秒内将流限制为 10Thread.sleep()个元素:

long totalSent = 0;
for (BElement value : valuesList) {
    totalSent++;
    out.collect(updatedTileMapVersion);
    if (totalSent % 10 == 0) {
        Thread.sleep(1000)
    }
}

模拟processConnectedStreams为与另一个需要 1 秒,Thread.sleep()我们已经尝试过: * 将所有管道的并行度设置为 10 - 不起作用 * 将所有管道的并行度设置为 15 - 确实有效

问题

我们不想使用所有这些资源,因为流B很少被激活,并且对于A具有高并行度的流元素来说是一种矫枉过正。是否可以在不将并行度设置为超过b我们每秒发送的元素数量的情况下解决它?

标签: apache-flinkflink-streaming

解决方案


如果您共享完整的工作流拓扑,这将很有用。例如,您没有提到对数据进行任何键控或随机分区。如果确实如此,那么 Flink 将在一个任务中处理多个操作,这可能(取决于拓扑)导致您看到的问题。

如果是这种情况,那么在执行之前强制分区processConnectedStreams可能会有所帮助,因为该操作将从网络缓冲区中读取。


推荐阅读