apache-spark - Spark 中的 Drools - 性能
问题描述
我在 Scala/Spark 中有一个批处理作业,它根据一些输入动态创建 Drools 规则,然后评估规则。我也有与RDD[T]
要插入规则引擎的事实相对应的输入。
到目前为止,我正在一一插入事实,然后触发有关该事实的所有规则。我正在使用rdd.aggregate
.
seqOp 运算符定义如下:
/**
* @param broadcastRules the broadcasted KieBase object containing all rules
* @param aggregator used to accumulate values when rule matches
* @param item the fact to run Drools with
* @tparam T the type of the given item
* @return the updated aggregator
*/
def seqOp[T: ClassTag](broadcastRules: Broadcast[KieBase])(
aggregator: MyAggregator,
item: T) : MyAggregator = {
val session = broadcastRules.value.newStatelessKieSession
session.setGlobal("aggregator", aggregator)
session.execute(CommandFactory.newInsert(item))
aggregator
}
以下是生成规则的示例:
dialect "mvel"
global batch.model.MyAggregator aggregator
rule "1"
when condition
then do something on the aggregator
end
对于相同的 RDD,批处理需要 20 分钟来评估 3K 规则,但需要 10 小时来评估 10K 规则!
我想知道按事实插入事实是否是最好的方法。一次插入RDD的所有项目然后触发所有规则更好吗?这对我来说似乎不是最优的,因为所有事实都会同时在工作记忆中。
你看到上面的代码有什么问题吗?
解决方案
最后我发现了这个问题,它更多地与规则匹配时在聚合器上执行的操作有关,而不是与规则的评估有关。
推荐阅读
- database - 布尔表达式不能为空?
- asp.net - 通过 Microsoft Graph 访问共享邮箱 - “访问被拒绝”
- java - C#如何关闭所有子进程
- javascript - firebase.firestore() .set() 没有第一次触发
- javascript - 如何在节点 js 中记录特定的错误消息?
- javascript - Vue.js:访问另一个组件中的数组数据
- c# - 如何返回 IEnumerable
/IL 列表 为 null 或使用 null-coalescing 运算符的值 - graphics - 如何为 2d 图形设置 Dev-C++
- firebase - Firebase 身份验证中的速率限制
- c# - Specflow - 创建预定义数据,以便在并行执行的测试执行中的所有场景之间共享