首页 > 解决方案 > 仅获取最新的 Elasticsearch 文档的总和,按某个字段聚合/分组

问题描述

我需要计算文档中字段的总和,按其他字段分组,但只有第三个字段是最新的条目。

例如,对于这些文档:

{
    "time": "2019-08-21T13:00:00",
    "session_id": "1",
    "byte_count": 200,
    "ip": "1.1.1.1"
}
{
    "time": "2019-08-21T12:00:00",
    "session_id": "1",
    "byte_count": 100,
    "ip": "1.1.1.1"
}
{
    "time": "2019-08-21T12:00:00",
    "session_id": "2",
    "byte_count": 123,
    "ip": "2.2.2.2"
}
{
    "time": "2019-08-21T14:00:00",
    "session_id": "3",
    "byte_count": 100,
    "ip": "1.1.1.1"
}

需要按 分组session_id,但只有最新条目,并且这些结果必须按 分组ip,并求和byte_count

我已经看到可以使用 进行分组collapse,并且它有效。我还看到可以使用聚合 ( aggs) 进行求和,但是当我将两者一起使用时,看起来像是aggs对实际文档进行操作,而不是collapse. 这个搜索

{
    "collapse": {
        "field": "session_id",
        "inner_hits": {
            "name": "most_recent",
            "size": 1,
            "sort": [
                {
                    "time": "desc"
                }
            ]
        }
    },
    "aggs": {
        "by_ip": {
            "terms": {
                "field": "ip"
            },
            "aggs": {
                "total_bytes": {
                    "sum": {
                        "field": "byte_count"
                    }
                }
            }
        }
    }
}

具有正确的三个命中,但聚合输出具有以下存储桶:

buckets": [
    {
        "key": "1.1.1.1",
        "doc_count": 3,
        "total_bytes": {
            "value": 400
        }
    },
    {
        "key": "2.2.2.2",
        "doc_count": 1,
        "total_bytes": {
            "value": 123
        }
    }
]

我也看到一个可以与 分组aggs top_hits,但是当我尝试对aggs sum结果进行这样的操作时:

{
    "size": 0,
    "aggs": {
        "by_session": {
            "terms": {
                "field": "session_id"
            },
            "aggs": {
                "per_session": {
                    "top_hits": {
                        "sort": [
                            {
                                "time": "desc"
                            }
                        ],
                        "size": 1
                    },
                    "aggs": {
                        "per_ip": {
                            "terms": {
                                "field": "ip"
                            },
                            "aggs": {
                                "total_bytes": {
                                    "sum": {
                                        "field": "byte_count"
                                    }
                                }
                            }
                        }
                    }
                }
            }
        }
    }
}

我得到错误:

Aggregator [per_session] of type [top_hits] cannot accept sub-aggregations

如何更新搜索参数以获得预期结果:

{
    "key": "1.1.1.1",
    "doc_count": 2,
    "total_bytes": {
        "value": 300
    }
},
{
    "key": "2.2.2.2",
    "doc_count": 1,
    "total_bytes": {
        "value": 123
    }
}

?

标签: elasticsearch

解决方案


使用的测试数据:

PUT test
POST test/_doc
{
    "time": "2019-08-21T13:00:00",
    "session_id": 1,
    "byte_count": 200,
    "ip": "1.1.1.1"
}
POST test/_doc
{
    "time": "2019-08-21T13:00:00",
    "session_id": 1,
    "byte_count": 700,
    "ip": "1.1.1.1"
}
POST test/_doc
{
    "time": "2019-08-21T12:00:00",
    "session_id": 1,
    "byte_count": 100,
    "ip": "1.1.1.1"
}
POST test/_doc
{
    "time": "2019-08-21T12:00:00",
    "session_id": 2,
    "byte_count": 123,
    "ip": "2.2.2.2"
}
POST test/_doc
{
    "time": "2019-08-21T14:00:00",
    "session_id": 3,
    "byte_count": 100,
    "ip": "1.1.1.1"
}

我不确定我是否理解正确。您说“最新条目”:“需要按 session_id 分组,但只有最新条目,并且那些结果必须按 ip 分组,并在 byte_count 上求和”

这是否意味着最新的“时间”?从您的输入数据中,您不希望得到“ip”:“1.1.1.1”以下结果吗?:

{
"time": "2019-08-21T14:00:00",
"session_id": "3",
"byte_count": 100,
"ip": "1.1.1.1"
}

因为与 ip:1.1.1.1 的其他文档相比,这具有更新的“时间”?

无论如何,这里是一个按 IP 分组然后按 session_id 分组的查询。然后,seesion id 存储桶按最新的“时间”排序

{
"size": 0,
"aggs": {
    "per_ip": {
        "terms": {
            "field": "ip"
        },
        "aggs": {
            "per_Session": {
                        "terms": {
                            "field": "session_id",
                             "order" : { "my_max_date" : "desc" }

                        },
                        "aggs": {
                            "total_bytes": {
                                "sum": {
                                    "field": "byte_count"
                                }
                            },
                            "my_max_date" : { "max" : { "field" : "time" } }
                        }

                    }
        }
    }
}
}

如果您只想获取第一个存储桶,只需在订单后添加 "size":1 即可。然后,您需要从聚合中提取文档。

这就是我得到的:

 "per_ip" : {
  "doc_count_error_upper_bound" : 0,
  "sum_other_doc_count" : 0,
  "buckets" : [
    {
      "key" : "1.1.1.1",
      "doc_count" : 3,
      "per_Session" : {
        "doc_count_error_upper_bound" : 0,
        "sum_other_doc_count" : 0,
        "buckets" : [
          {
            "key" : 3,
            "doc_count" : 1,
            "total_bytes" : {
              "value" : 100.0
            },
            "my_max_date" : {
              "value" : 1.566396E12,
              "value_as_string" : "2019-08-21T14:00:00.000Z"
            }
          },
          {
            "key" : 1,
            "doc_count" : 2,
            "total_bytes" : {
              "value" : 300.0
            },
            "my_max_date" : {
              "value" : 1.5663924E12,
              "value_as_string" : "2019-08-21T13:00:00.000Z"
            }
          }
        ]
      }
    },
    {
      "key" : "2.2.2.2",
      "doc_count" : 1,
      "per_Session" : {
        "doc_count_error_upper_bound" : 0,
        "sum_other_doc_count" : 0,
        "buckets" : [
          {
            "key" : 2,
            "doc_count" : 1,
            "total_bytes" : {
              "value" : 123.0
            },
            "my_max_date" : {
              "value" : 1.5663888E12,
              "value_as_string" : "2019-08-21T12:00:00.000Z"
            }
          }
        ]
      }
    }
  ]
}
}

更新:

在前 2 条评论讨论之后添加了以下查询,该查询将“时间”字段聚合存储桶添加到 session_id 存储桶。这允许选择最新的条目。这仍然需要对结果进行分组,但这可以在客户端通过读取存储桶并进行求和来完成,或者可以使用“Sum Bucket Aggregation”来完成

{
"size": 5,
"aggs": {
    "per_ip": {
        "terms": {
            "field": "ip"
        },
        "aggs": {
            "per_Session": {
                        "terms": {
                            "field": "session_id"
                        },
                        "aggs": {

                            "my_max_date" : { 
                                "terms": {
                                  "field": "time",
                                  "order": [
                                    {
                                      "_key": "desc"
                                    }
                                  ],
                                  "size":1
                              },
                              "aggs" :
                              {
                                 "total_bytes": {
                                    "terms": {
                                        "field": "byte_count",
                                        "size":2
                                    }
                                }
                              }
                            }
                        }

                    }
        }
    }
}}

推荐阅读