首页 > 解决方案 > Spark 中的 Drools - 性能

问题描述

我在 Scala/Spark 中有一个批处理作业,它根据一些输入动态创建 Drools 规则,然后评估规则。我也有与RDD[T]要插入规则引擎的事实相对应的输入。

到目前为止,我正在一一插入事实,然后触发有关该事实的所有规则。我正在使用rdd.aggregate.

seqOp 运算符定义如下:

/**
 * @param broadcastRules the broadcasted KieBase object containing all rules
 * @param aggregator used to accumulate values when rule matches
 * @param item the fact to run Drools with
 * @tparam T the type of the given item
 * @return the updated aggregator
 */
def seqOp[T: ClassTag](broadcastRules: Broadcast[KieBase])(
  aggregator: MyAggregator,
  item: T) : MyAggregator = {
  val session = broadcastRules.value.newStatelessKieSession
  session.setGlobal("aggregator", aggregator)
  session.execute(CommandFactory.newInsert(item))
  aggregator
}

以下是生成规则的示例:

dialect "mvel"
global batch.model.MyAggregator aggregator
rule "1"
 when condition
 then do something on the aggregator
end 

对于相同的 RDD,批处理需要 20 分钟来评估 3K 规则,但需要 10 小时来评估 10K 规则!

我想知道按事实插入事实是否是最好的方法。一次插入RDD的所有项目然后触发所有规则更好吗?这对我来说似乎不是最优的,因为所有事实都会同时在工作记忆中。

你看到上面的代码有什么问题吗?

标签: apache-sparkdrools

解决方案


最后我发现了这个问题,它更多地与规则匹配时在聚合器上执行的操作有关,而不是与规则的评估有关。


推荐阅读