首页 > 解决方案 > 如何在 Rx.Net 中使用 iteself 递归地执行打结/定义 observable?

问题描述

有时,业务逻辑似乎能够通过一些递归定义的 observables 进行自然建模。这是一个例子:

interface Demo {
    IObservable<CommandId> userCommands;
    IObservable<IObservable<IProcessingState>> processes;
    IObservable<CommandId> skippedCommands;
    IObservable<(CommandId, CommandResult)> RunCommand(CommandId id);
}

interface IProcessingState {
    bool IsProcessing {get;}
    CommandId? ProcessingId {get;}
}

对于用户输入的每个命令,它应该在 proccess 中触发正在运行的进程,或者在 skippedCommands 中发出一个值。这种逻辑的一些直接翻译可能

var validCommands = userCommands.WithLatestFrom(processes).Where(x => !x.Item2.IsProcessing)
var skippedCommands = userCommands.WithLatestFrom(processes).Where(x => x.Item2.IsProcessing)
var processes = validCommands.Select(c => RunCommand(c))

如上面的代码所示,assign ofvalidCommandsprocesses是相互递归的,我们可以等效地定义processes直接使用自身递归

var processes = userCommands.WithLatestFrom(processes)
                            .Where(x => !x.Item2.IsProcessing)
                            .Select(c => RunCommand(c))

但是我们不能prcesses像这样在 C# 中定义 Observable。

我发现了几个可能相关的事情:

  1. Observable.Generate构造函数。但是,它似乎以同步方式折叠自己的状态,我不知道如何使用userCommandsobservable 和RunCommandin Observable.Generate

  2. RxJS 中的一些操作符exhaustexhaustMap虽然 Rx.Net 没有提供这个操作符,但是有一些第三方库提供了这些操作符,比如FSharp.Control.Reactive。实现类似于

let exhaustMap f source =
        Observable.Create (fun (o : IObserver<_>) -> 
            let mutable hasSubscription = false
            let mutable innerSub = None
            let onInnerCompleted () =
                hasSubscription <- false
                innerSub |> Option.iter Disposable.dispose
            let onOuterNext x =
                if not hasSubscription then
                    hasSubscription <- true 
                    f x |> subscribeSafeWithCallbacks 
                            o.OnNext o.OnError onInnerCompleted
                        |> fun y -> innerSub <- Some y
            source
            |> subscribeSafeWithCallbacks
                onOuterNext o.OnError o.OnCompleted)

但是,有两个问题。一个。直接使用此运算符不符合上述要求,跳过的命令将被忽略。我们可以稍微修改源代码以适应要求,但还有另一个问题 b. 该实现引入了两个局部可变变量和两个嵌套订阅。我不知道这在所有情况下是否都可以(会有数据竞争的风险吗?),并且更喜欢基于操作符组合而不是可变引用的解决方案

  1. SodiumFRP提供了前向引用类型StreamLoopCellLoop. 根据功能响应式编程一书,这些前向引用类型的 Rx 替代方案将是Subject,通过使用Subject上面的递归构造,将其分为两个阶段。问题是由 Intro to Rx 指出的,使用Subject需要手动管理更多状态,至少需要 dispose 主题,并且可能被迫使用 hot observables。我想知道是否存在不使用的解决方案Subject

  2. 在结果的最后一个值上使用window带边界的运算符(就在完成之前)RunCommandprocesses上面可以构造一些方法,但是这个解决方案需要使用两次结束信号,这需要仔细处理(在尝试和调整时安静一段时间Take(1),,,,,重载操作员获得预期结果)同时发生的事件。zipwithLatestFromcombineLatestWindow

是否有更好的解决方案或对上述解决方案的修改,尤其是仅使用运算符?

标签: rxjsfrprx.netsodiumtying-the-knot

解决方案


你的类型都很奇怪,很难使用。您的问题基本上是一个带有两个触发器的简单状态机:1)新命令到达,2)先前的命令完成执行。

这应该让你开始:

void Main()
{
    IObservable<ICommand> source = new Subject<ICommand>();
    var executionTerminatedLoopback = new Subject<Unit>();
    
    var stateMachine = source
        .Select(c => (command: c, type: 1))
        .Merge(executionTerminatedLoopback.Select(_ => (command: (ICommand)null, type: 2)))
        .Scan((isExecuting: false, validCommand: (ICommand)null, failedCommand: (ICommand)null), (state, msg) => {
            if(msg.type == 2)
                return (false, null, null);
            if(state.isExecuting)
                return (true, null, msg.command);
            else
                return (true, msg.command, null);
        });
    
    var validCommands = stateMachine.Where(t => t.validCommand != null).Select(t => t.validCommand);
    var failedCommands = stateMachine.Where(t => t.failedCommand != null).Select(t => t.failedCommand);
    
    validCommands.SelectMany(c => c.Execute()).Subscribe(executionTerminatedLoopback);
}

public interface ICommand{ 
    IObservable<Unit> Execute();
}

source这里不必是主题,也可能不应该是:它可以是任何IObservable<ICommand>. 在答案中模拟更容易。尽管exeuctionTerminatedLoopback确实必须是(如您所说)将递归分解为两部分。因为它是一个主题,所以它应该保密,不能泄露出去。

如果不使用主题,我认为 C# 中没有任何答案。


推荐阅读