elasticsearch - 对 top_hits 聚合求和
问题描述
简而言之:如果我有一个每个桶的 top_hits 的聚合,我如何对结果结构中的特定值求和?
细节:
我有许多记录,每个商店包含一定数量的记录。我想获得每家商店所有最新记录的总和。
为了获得每个商店的最新记录,我创建了以下聚合:
"latest_quantity_per_store": {
"aggs": {
"latest_quantity": {
"top_hits": {
"sort": [
{
"datetime": "desc"
},
{
"quantity": "asc"
}
],
"_source": {
"includes": [
"quantity"
]
},
"size": 1
}
}
},
"terms": {
"field": "store",
"size": 10000
}
}
假设我有两个商店,每个商店有两个数量,用于两个不同的时间戳。这是该聚合的结果:
"latest_quantity_per_store": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "01",
"doc_count": 2,
"latest_quantity": {
"hits": {
"total": 2,
"max_score": null,
"hits": [
{
"_index": "inventory-local",
"_type": "doc",
"_id": "O6wFD2UBG8e7nvSU8dYg",
"_score": null,
"_source": {
"quantity": 6
},
"sort": [
1532476800000,
6
]
}
]
}
}
},
{
"key": "02",
"doc_count": 2,
"latest_quantity": {
"hits": {
"total": 2,
"max_score": null,
"hits": [
{
"_index": "inventory-local",
"_type": "doc",
"_id": "pLUFD2UBHBuSGcoH0ZT4",
"_score": null,
"_source": {
"quantity": 11
},
"sort": [
1532476800000,
11
]
}
]
}
}
}
]
}
我现在想在 ElasticSearch 中进行聚合,以获取这些存储桶的总和。在示例数据中,总和超过 6 和 11。我尝试了以下聚合:
"latest_quantity": {
"sum_bucket": {
"buckets_path": "latest_quantity_per_store>latest_quantity>hits>hits>_source>quantity"
}
}
但这会导致此错误:
{
"error": {
"root_cause": [
{
"type": "illegal_argument_exception",
"reason": "No aggregation [hits] found for path [latest_quantity_per_store>latest_quantity>hits>hits>_source>quantity]"
}
],
"type": "search_phase_execution_exception",
"reason": "all shards failed",
"phase": "query",
"grouped": true,
"failed_shards": [
{
"shard": 0,
"index": "inventory-local",
"node": "3z5CqmmAQ-yT2sUCb69DzA",
"reason": {
"type": "illegal_argument_exception",
"reason": "No aggregation [hits] found for path [latest_quantity_per_store>latest_quantity>hits>hits>_source>quantity]"
}
}
]
},
"status": 400
}
从 ElasticSearch 以某种方式返回数字 17 的正确聚合是什么?
我为我拥有的另一个聚合做了类似的事情,平均值而不是 top_hits 聚合。
"average_quantity": {
"sum_bucket": {
"buckets_path": "average_quantity_per_store>average_quantity"
}
},
"average_quantity_per_store": {
"aggs": {
"average_quantity": {
"avg": {
"field": "quantity"
}
}
},
"terms": {
"field": "store",
"size": 10000
}
}
这按预期工作,结果如下:
"average_quantity_per_store": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "01",
"doc_count": 2,
"average_quantity": {
"value": 6
}
},
{
"key": "02",
"doc_count": 2,
"average_quantity": {
"value": 11.5
}
}
]
},
"average_quantity": {
"value": 17.5
}
解决方案
有一种方法可以结合使用scripted_metric
聚合和sum_bucket
管道聚合来解决这个问题。脚本化的度量聚合有点复杂,但主要思想是允许您提供自己的分桶算法并从中输出单个度量值。
在您的情况下,您要做的是找出每个商店的最新数量,然后将这些商店数量相加。解决方案如下所示,我将在下面解释一些细节:
POST inventory-local/_search
{
"size": 0,
"aggs": {
"bystore": {
"terms": {
"field": "store.keyword",
"size": 10000
},
"aggs": {
"latest_quantity": {
"scripted_metric": {
"init_script": "params._agg.quantities = new TreeMap()",
"map_script": "params._agg.quantities.put(doc.datetime.date, [doc.datetime.date.millis, doc.quantity.value])",
"combine_script": "return params._agg.quantities.lastEntry().getValue()",
"reduce_script": "def maxkey = 0; def qty = 0; for (a in params._aggs) {def currentKey = a[0]; if (currentKey > maxkey) {maxkey = currentKey; qty = a[1]} } return qty;"
}
}
}
},
"sum_latest_quantities": {
"sum_bucket": {
"buckets_path": "bystore>latest_quantity.value"
}
}
}
}
请注意,为了使其正常工作,您需要script.painless.regex.enabled: true
在elasticsearch.yml
配置文件中进行设置。
为每个分片创建init_script
一个。用日期/数量的映射填充每个分TreeMap
片map_script
。TreeMap
我们放入地图的值包含时间戳和单个字符串中的数量。稍后我们将需要该时间戳reduce_script
。combine_script
简单地取最后一个值,因为TreeMap
这是给定分片的最新数量。大部分工作位于reduce_script
. 我们迭代每个分片的所有最新数量并返回最新的数量。
此时,我们有每家商店的最新数量。剩下要做的就是使用sum_bucket
管道聚合来汇总每个存储数量。你得到了 17 的结果。
响应如下所示:
"aggregations": {
"bystore": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "01",
"doc_count": 2,
"latest_quantity": {
"value": 6
}
},
{
"key": "02",
"doc_count": 2,
"latest_quantity": {
"value": 11
}
}
]
},
"sum_latest_quantities": {
"value": 17
}
}
推荐阅读
- c++ - 从串口使用 boost::asio::async_read_until 时截断的数据(如果超过 512 字节)
- excel - 当项目消失时运行 msgbox
- wcf - 无法为 ssl/tls 权限建立安全通道
- angular - 在 setValue 上,表单控件已更新,但输出对象未更新
- ruby - 使用递归添加数组元素
- django - 如何在 nginx uwsgi 上运行的 Django 应用程序中运行 cronjob
- c# - 如何用 Excel 工作表填充 Listview?(C#,Windows 窗体)
- java - Spring Data Neo4j APOC 未执行但没有错误
- javascript - 如何在 ReactJS 中获取 api 过滤器数据?
- node.js - Typescript:从包中导入类型,而不依赖于同一包中的其他类型