首页 > 解决方案 > 按特定顺序查找消息

问题描述

我已经开始研究 ReactiveX,但不知道它是否适合我试图解决的问题,因为我现在对 ReactiveX 还不够了解,或者它没有我需要的东西。

假设我不断收到可能是 20 种不同类型的消息。所有消息都应首先保存到数据库中。那我需要做一些进一步的分析。我对按顺序排列的 A、B、C 和 D 类型感兴趣(不必一个接一个)。当消息 A 出现时,应将其视为我需要触发的流程的开始。然后我应该等待消息 B(任何其他消息类型可以在等待时到达)到达并执行流程中的步骤。在消息 BI 之后等待消息 C 并执行过程中的步骤。然后我等待消息 D,它标志着该过程的结束。然后我需要重新开始并等待启动新进程的消息 A。

我正在使用 .NET,但来自任何平台的代码可能都可以确定如何(或是否)可以做到这一点。

更新:提供更多背景信息

使用@Enigmativity 示例代码,我将尝试稍微扩展这个问题。消息由设备产生。所以让我们假设在“A1,B2,B1,C1,F3,....”流的第一个字母是消息类型,数字是设备的 ID。因此,消息 A、B、C 和 D 需要属于同一设备才能被视为匹配项。服务器总是会收到所有的消息,因为设备会重复它们直到它得到确认。这是单个设备可以产生的(流可以包含来自所有设备的混合消息):

A1,B1,H1,F1,A1 - 这里设备在完成任何操作之前重新启动,首先 A1,B1 应该被忽略,我们现在重新开始等待 A、B、C 和 D。

A1,B1,C1,B1 - 这不可能发生。A1 总是在 B、C 或 D 之前出现。有时它可能不会到达 D,但它会重新开始。

标签: rxjsreactive-programmingsystem.reactiverx.net

解决方案


根据您的描述,我不确定是否可以保证您将始终获得每种消息类型 A、B、C 和 D,而不会获得另一组或重叠值。例如,如果第二个 A 出现在最后一个 D 之前,我有两种方法以防重启出现问题。

这是我的基本代码设置:

var subject = new Subject<string>();

IObservable<(string a, string b, string c, string d)> query = ...

query.Subscribe(x => Console.WriteLine($"{x.a} {x.b} {x.c} {x.d}"));

"A1,B1,A2,C1,F1,D1,A3,A4,B2,B3,A5,C2,B4,F2,D2,D3,C3,D3"
    .Split(',')
    .ToObservable()
    .Subscribe(subject);

这是所有内容都按顺序排列并且完美匹配的情况(尽管散布着其他类型的消息:

IObservable<(string a, string b, string c, string d)> query =
    subject
        .Do(x => { /* Save here */ })
        .Publish(ss =>
        {
            var ssa = ss.Where(s => s[0] == 'A');
            var ssb = ss.Where(s => s[0] == 'B');
            var ssc = ss.Where(s => s[0] == 'C');
            var ssd = ss.Where(s => s[0] == 'D');
            return Observable.When(
                ssa
                    .And(ssb)
                    .And(ssc)
                    .And(ssd)
                    .Then((a, b, c, d) => (a: a, b: b, c: c, d: d)));
        });

此查询使用 Rx 中非常强大但很少使用的模式/计划查询(也称为连接)。

如果您在消息出现故障时确实需要重置并且您需要最新消息,那么我认为这可行:

IObservable<(string a, string b, string c, string d)> query =
    subject
        .Do(x => { /* Save here */ })
        .Publish(ss =>
                ss
                    .Where(s => s[0] == 'A')
                    .Select(sa => ss.Where(s => s[0] == 'B').Select(sb => (a: sa, b: sb)))
                    .Switch()
                    .Select(sab => ss.Where(s => s[0] == 'C').Select(sc => (a: sab.a, b: sab.b, c: sc)))
                    .Switch()
                    .Select(sabc => ss.Where(s => s[0] == 'D').Select(sd => (a: sabc.a, b: sabc.b, c: sabc.c, d: sd)))
                    .Switch());

第一个查询给出了这个:

A1 B1 C1 D1
A2 B2 C2 D2
A3 B3 C3 D3

一切都很好,配对。

第二个给出了这个:

A1 B1 C1 D1
A4 B3 C2 D2
A4 B3 C2 D3
A5 B4 C3 D3

推荐阅读