首页 > 解决方案 > Splunk 查询根据同一索引中的其他事件过滤掉

问题描述

我有一个名为的索引Events

它包含一堆不同的事件,所有事件都有一个名为EventName. 现在我想做一个查询,返回与以下内容匹配的所有内容:

如果 AccountId 存在于 EventName 的事件AccountCreated中并且至少有 1 个 EventNameFavoriteCreated的事件具有相同的 AccountId -> 返回 EventName == 的所有事件AccountCreated

示例事件:

帐户已创建

{
   "AccountId": 1234,
   "EventName": "AccountCreated",
   "SomeOtherProperty": "Some value",
   "Brand": "My Brand",
   "DeviceType": "Mobile",
   "EventTime": "2020-06-01T12:13:14Z"
}

收藏创建

{
   "AccountId": 1234,
   "EventName": "FavoritesCreated,
   "Brand": "My Brand",
   "DeviceType": "Mobile",
   "EventTime": "2020-06-01T12:13:14Z"
}

鉴于以下两个事件,我想创建 1 个返回 AccountCreated 事件的查询。

我尝试了以下方法,但它不起作用,我肯定错过了一些简单的东西吗?

index=events EventName=AccountCreated 
  [search index=events EventName=FavoriteCreated | dedup AccountId | fields AccountId]
| table AccountId, SomeOtherProperty

我预计这里有 6000 次点击,但我只得到 2298 次事件。我错过了什么?

更新 根据下面@warren 给出的答案,以下查询有效。唯一的问题是它使用了 JOIN,这将我们的子搜索结果限制为 50K。运行此查询时,我总共得到 5900 个结果 = 正确。

index=events EventName=AccountCreated AccountId=*
| stats count by AccountId, EventName
| fields - count
| join AccountId 
    [ | search index=events EventName=FavoriteCreated AccountId=*
    | stats count by AccountId ]
| fields - count
| table AccountId, EventName

然后我尝试像这样使用他更新的示例,但问题似乎是它返回FavoriteCreated 事件而不是AccountCreated。运行此查询时,我得到 25 494 次点击 = 不正确。

index=events AccountId=* (EventName=AccountCreated OR EventName=FavoriteCreated)
| stats values(EventName) as EventName by AccountId
| eval EventName=mvindex(EventName,-1)
| search EventName="FavoriteCreated"
| table AccountId, EventName

更新 2 - 工作 @warren 很棒,这是一个完整的工作查询,如果存在 1 个或多个 FavoriteCreated 事件,则仅返回来自 AccountCreated 事件的数据。

index=events AccountId=* (EventName=AccountCreated OR EventName=FavoriteCreated)
| stats 
    values(Brand) as Brand,
    values(DeviceType) as DeviceType,
    values(Email) as Email,
    values(EventName) as EventName
    values(EventTime) as EventTime,
    values(Locale) as Locale,
    values(ClientIp) as ClientIp
  by AccountId
| where mvcount(EventName)>1
| eval EventName=mvindex(EventName,0)
| eval EventTime=mvindex(EventTime,0)
| eval ClientIp=mvindex(ClientIp,0)
| eval DeviceType=mvindex(DeviceType,0)

标签: splunksplunk-query

解决方案


您可能发现问题的一个因素-子搜索的上限为50,000(执行 a 时join)事件(或 60 秒的运行时间(或使用“正常”子搜索时的10,000 个结果))。

首先倾销dedup有利于stats

index=events EventName=AccountCreated AccountId=*
| stats count by AccountId, SomeOtherProperty [, more, fields, as, desired]
| fields - count
| search 
    [ | search index=events EventName=FavoriteCreated AccountId=*
    | stats count by AccountId 
    | fields - count]
<rest of search>

如果这不能让您到达您想要的位置(即,您的子搜索中仍然有太多结果),您可以尝试join

index=events EventName=AccountCreated AccountId=*
| stats count by AccountId, SomeOtherProperty [, more, fields, as, desired]
| fields - count
| join AccountId 
    [ | search index=events EventName=FavoriteCreated AccountId=*
    | stats count by AccountId ]
| fields - count
<rest of search>

还有更多方法可以做您正在寻找的事情-但这两种方法应该可以使您朝着目标迈进很长的路

这是一个join-less 方法,它只显示“FavoriteCreated”事件:

index=events AccountId=* (EventName=AccountCreated OR EventName=FavoriteCreated)
| stats values(EventName) as EventName values(SomeOtherProperty) as SomeOtherProperty by AccountId
| eval EventName=mvindex(EventName,-1)
| search EventName="FavoriteCreated"

只有在同一时间范围内还有一个“AccountCreated”事件时,这里才会显示“FavoriteCreated” :

index=events AccountId=* (EventName=AccountCreated OR EventName=FavoriteCreated)
| stats values(EventName) as EventName values(SomeOtherProperty) as SomeOtherProperty by AccountId
| where mvcount(EventName)>1

如果您想“假装”values()没有发生(即丢弃“favoriteCreated”条目),请添加以下内容:

| eval EventName=mvindex(EventName,0)

推荐阅读