rxjs - 按特定顺序查找消息
问题描述
我已经开始研究 ReactiveX,但不知道它是否适合我试图解决的问题,因为我现在对 ReactiveX 还不够了解,或者它没有我需要的东西。
假设我不断收到可能是 20 种不同类型的消息。所有消息都应首先保存到数据库中。那我需要做一些进一步的分析。我对按顺序排列的 A、B、C 和 D 类型感兴趣(不必一个接一个)。当消息 A 出现时,应将其视为我需要触发的流程的开始。然后我应该等待消息 B(任何其他消息类型可以在等待时到达)到达并执行流程中的步骤。在消息 BI 之后等待消息 C 并执行过程中的步骤。然后我等待消息 D,它标志着该过程的结束。然后我需要重新开始并等待启动新进程的消息 A。
我正在使用 .NET,但来自任何平台的代码可能都可以确定如何(或是否)可以做到这一点。
更新:提供更多背景信息
使用@Enigmativity 示例代码,我将尝试稍微扩展这个问题。消息由设备产生。所以让我们假设在“A1,B2,B1,C1,F3,....”流的第一个字母是消息类型,数字是设备的 ID。因此,消息 A、B、C 和 D 需要属于同一设备才能被视为匹配项。服务器总是会收到所有的消息,因为设备会重复它们直到它得到确认。这是单个设备可以产生的(流可以包含来自所有设备的混合消息):
A1,B1,H1,F1,A1 - 这里设备在完成任何操作之前重新启动,首先 A1,B1 应该被忽略,我们现在重新开始等待 A、B、C 和 D。
A1,B1,C1,B1 - 这不可能发生。A1 总是在 B、C 或 D 之前出现。有时它可能不会到达 D,但它会重新开始。
解决方案
根据您的描述,我不确定是否可以保证您将始终获得每种消息类型 A、B、C 和 D,而不会获得另一组或重叠值。例如,如果第二个 A 出现在最后一个 D 之前,我有两种方法以防重启出现问题。
这是我的基本代码设置:
var subject = new Subject<string>();
IObservable<(string a, string b, string c, string d)> query = ...
query.Subscribe(x => Console.WriteLine($"{x.a} {x.b} {x.c} {x.d}"));
"A1,B1,A2,C1,F1,D1,A3,A4,B2,B3,A5,C2,B4,F2,D2,D3,C3,D3"
.Split(',')
.ToObservable()
.Subscribe(subject);
这是所有内容都按顺序排列并且完美匹配的情况(尽管散布着其他类型的消息:
IObservable<(string a, string b, string c, string d)> query =
subject
.Do(x => { /* Save here */ })
.Publish(ss =>
{
var ssa = ss.Where(s => s[0] == 'A');
var ssb = ss.Where(s => s[0] == 'B');
var ssc = ss.Where(s => s[0] == 'C');
var ssd = ss.Where(s => s[0] == 'D');
return Observable.When(
ssa
.And(ssb)
.And(ssc)
.And(ssd)
.Then((a, b, c, d) => (a: a, b: b, c: c, d: d)));
});
此查询使用 Rx 中非常强大但很少使用的模式/计划查询(也称为连接)。
如果您在消息出现故障时确实需要重置并且您需要最新消息,那么我认为这可行:
IObservable<(string a, string b, string c, string d)> query =
subject
.Do(x => { /* Save here */ })
.Publish(ss =>
ss
.Where(s => s[0] == 'A')
.Select(sa => ss.Where(s => s[0] == 'B').Select(sb => (a: sa, b: sb)))
.Switch()
.Select(sab => ss.Where(s => s[0] == 'C').Select(sc => (a: sab.a, b: sab.b, c: sc)))
.Switch()
.Select(sabc => ss.Where(s => s[0] == 'D').Select(sd => (a: sabc.a, b: sabc.b, c: sabc.c, d: sd)))
.Switch());
第一个查询给出了这个:
A1 B1 C1 D1
A2 B2 C2 D2
A3 B3 C3 D3
一切都很好,配对。
第二个给出了这个:
A1 B1 C1 D1
A4 B3 C2 D2
A4 B3 C2 D3
A5 B4 C3 D3
推荐阅读
- python - How can I control the topology of neural network when using MLPClassifier?
- vim - Vim:匹配第 N 次出现的 PATTERN 并仅在匹配行上突出显示该模式
- java - 从类调用方法时出现 NullPointerExceptionError
- reactjs - Create-react-app-parcel-typescript with Jest
- python - 对重复出现的行进行分组并查找 Pandas 中单个日期时间列的时间差
- c# - Client side public key encryption
- python - 使用 python 处理 vi yank 缓冲区
- typescript - Query regarding using async await in typescript class
- python - 为什么需要重做自动映射?(SQL炼金术)
- magento2 - Magento 2 - Get Customer Address Id through Shipping Address at Checkout Session