apache-flink - Flink:在 DataStream 和“规则集”之间实现“连接”
问题描述
以下用例的最佳实践建议是什么?我们需要将流与一组“规则”进行匹配,这些规则本质上是一个 Flink DataSet 概念。对此“规则集”的更新是可能的,但并不频繁。每个流事件必须检查“规则集”中的所有记录,并且每次匹配都会在接收器数据流中产生一个或多个事件。规则集中的记录数是在 6 位数范围内。
目前,我们只是将规则加载到本地规则列表中,并在传入的 DataStream 上使用 flatMap。在 flatMap 中,我们只是遍历一个列表,将每个事件与每个规则进行比较。
为了加快迭代速度,我们还可以将列表分成几个批次,本质上是创建一个列表列表,并创建一个单独的线程来迭代每个子列表(在 Java 或 Scala 中使用 Futures)。
问题:
- 有没有更好的方法来进行这种加入?
- 如果不是,在 Flink 已经在做的事情之上,通过在每个 flatMap 操作中创建新线程来添加额外的并行性是否安全?
编辑:这是要求的示例代码:
package wikiedits
import org.apache.flink.streaming.connectors.wikiedits.{WikipediaEditEvent, WikipediaEditsSource}
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.extensions._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.Future
object WikipediaEditEventProcessor {
def main(args: Array[String])= {
val see = StreamExecutionEnvironment.getExecutionEnvironment
val edits = see.addSource(new WikipediaEditsSource())
val ruleSets = Map[Int, List[String]](
(1, List("a", "b", "c", "d", "e", "f", "g", "h", "i", "j")),
(2, List("k", "l", "m", "n", "o", "p", "q", "r", "s", "t")),
(3, List("u", "v", "w", "x", "y", "z", "0", "1", "2", "3"))
)
val result = edits.flatMap { edit =>
ruleSets.map { ruleSet =>
applyRuleSet(edit, ruleSet._2, ruleSet._1)
}
}
see.execute
}
def applyRuleSet(event: WikipediaEditEvent, ruleSet: List[String], ruleSetId: Int): Future[List[String]] = {
val title = event.getTitle
Future(
ruleSet.map {
case rule if title.contains(rule) =>
val result = s"Ruleset $ruleSetId: $rule -> exists in: $title"
println(result) // this would be creating an output event instead
result
case rule =>
val result = s"Ruleset $ruleSetId: $rule -> NO MATCH in: $title"
println(result)
result
}
)
}
}
解决方案
每个流事件都必须检查“规则集”中的所有记录,并且每个匹配都会将一个或多个事件生成到接收器数据流中。规则集中的记录数在 6 位范围内
假设你有 K 个规则。如果输入速率快于处理单个事件的 K 条规则所花费的时间,则您的方法很好。否则,您需要一些可以并行处理这些 K 规则的方法。
将它们视为 K 收费站。将它们一个接一个地放置,而不是将它们放在一个大房间内。这将简化流引擎的事情。
换句话说,使用简单的 for 循环遍历所有规则,并为每个规则设置一个单独的 flatMap。这样,它们中的每一个都是相互独立的,因此可以并行处理。最后你会有 K 个 flatMaps 来执行。无论您为执行提供什么配置,引擎都会使用可能的最大并行度。这种方法将最大可能的并行度限制为 K。但是,这对于大量规则来说已经足够了。
通过在每个 flatMap 操作中创建新线程来增加并行性
完全不推荐。将并行性留给 flink。您定义了您希望在 flatMap 中执行的工作单元。
推荐阅读
- php - Laravel 查询生成器对表 password_resets 的空白响应
- java - Firebase facebook 和 gmail 帐户未合并
- docker - Kafka 集成测试,无法设置启用 SASL 的机制
- java - 通过分隔符解析时的值偏移
- c - 写下这个给定代码的最坏情况运行时间 O(n)?
- linux - 如何在 Debian 10 Buster XFCE 中启用第三个屏幕
- html - Spring Security 自定义登录页面不允许我进入
- python - 时间序列分析的熊猫日期时间问题
- r - 在r中创建一个计算列表中元素数量的函数
- python - 为什么使用停用词或 nltk 语料库后有些英语单词会被删除?