rxjs - 如何在 Rx.Net 中使用 iteself 递归地执行打结/定义 observable?
问题描述
有时,业务逻辑似乎能够通过一些递归定义的 observables 进行自然建模。这是一个例子:
interface Demo {
IObservable<CommandId> userCommands;
IObservable<IObservable<IProcessingState>> processes;
IObservable<CommandId> skippedCommands;
IObservable<(CommandId, CommandResult)> RunCommand(CommandId id);
}
interface IProcessingState {
bool IsProcessing {get;}
CommandId? ProcessingId {get;}
}
对于用户输入的每个命令,它应该在 proccess 中触发正在运行的进程,或者在 skippedCommands 中发出一个值。这种逻辑的一些直接翻译可能
var validCommands = userCommands.WithLatestFrom(processes).Where(x => !x.Item2.IsProcessing)
var skippedCommands = userCommands.WithLatestFrom(processes).Where(x => x.Item2.IsProcessing)
var processes = validCommands.Select(c => RunCommand(c))
如上面的代码所示,assign ofvalidCommands
和processes
是相互递归的,我们可以等效地定义processes
直接使用自身递归
var processes = userCommands.WithLatestFrom(processes)
.Where(x => !x.Item2.IsProcessing)
.Select(c => RunCommand(c))
但是我们不能prcesses
像这样在 C# 中定义 Observable。
我发现了几个可能相关的事情:
Observable.Generate
构造函数。但是,它似乎以同步方式折叠自己的状态,我不知道如何使用userCommands
observable 和RunCommand
inObservable.Generate
;RxJS 中的一些操作符
exhaust
,exhaustMap
虽然 Rx.Net 没有提供这个操作符,但是有一些第三方库提供了这些操作符,比如FSharp.Control.Reactive。实现类似于
let exhaustMap f source =
Observable.Create (fun (o : IObserver<_>) ->
let mutable hasSubscription = false
let mutable innerSub = None
let onInnerCompleted () =
hasSubscription <- false
innerSub |> Option.iter Disposable.dispose
let onOuterNext x =
if not hasSubscription then
hasSubscription <- true
f x |> subscribeSafeWithCallbacks
o.OnNext o.OnError onInnerCompleted
|> fun y -> innerSub <- Some y
source
|> subscribeSafeWithCallbacks
onOuterNext o.OnError o.OnCompleted)
但是,有两个问题。一个。直接使用此运算符不符合上述要求,跳过的命令将被忽略。我们可以稍微修改源代码以适应要求,但还有另一个问题 b. 该实现引入了两个局部可变变量和两个嵌套订阅。我不知道这在所有情况下是否都可以(会有数据竞争的风险吗?),并且更喜欢基于操作符组合而不是可变引用的解决方案
SodiumFRP提供了前向引用类型
StreamLoop
和CellLoop
. 根据功能响应式编程一书,这些前向引用类型的 Rx 替代方案将是Subject
,通过使用Subject
上面的递归构造,将其分为两个阶段。问题是由 Intro to Rx 指出的,使用Subject
需要手动管理更多状态,至少需要 dispose 主题,并且可能被迫使用 hot observables。我想知道是否存在不使用的解决方案Subject
在结果的最后一个值上使用
window
带边界的运算符(就在完成之前)RunCommand
,processes
上面可以构造一些方法,但是这个解决方案需要使用两次结束信号,这需要仔细处理(在尝试和调整时安静一段时间Take(1)
,,,,,重载操作员获得预期结果)同时发生的事件。zip
withLatestFrom
combineLatest
Window
是否有更好的解决方案或对上述解决方案的修改,尤其是仅使用运算符?
解决方案
你的类型都很奇怪,很难使用。您的问题基本上是一个带有两个触发器的简单状态机:1)新命令到达,2)先前的命令完成执行。
这应该让你开始:
void Main()
{
IObservable<ICommand> source = new Subject<ICommand>();
var executionTerminatedLoopback = new Subject<Unit>();
var stateMachine = source
.Select(c => (command: c, type: 1))
.Merge(executionTerminatedLoopback.Select(_ => (command: (ICommand)null, type: 2)))
.Scan((isExecuting: false, validCommand: (ICommand)null, failedCommand: (ICommand)null), (state, msg) => {
if(msg.type == 2)
return (false, null, null);
if(state.isExecuting)
return (true, null, msg.command);
else
return (true, msg.command, null);
});
var validCommands = stateMachine.Where(t => t.validCommand != null).Select(t => t.validCommand);
var failedCommands = stateMachine.Where(t => t.failedCommand != null).Select(t => t.failedCommand);
validCommands.SelectMany(c => c.Execute()).Subscribe(executionTerminatedLoopback);
}
public interface ICommand{
IObservable<Unit> Execute();
}
source
这里不必是主题,也可能不应该是:它可以是任何IObservable<ICommand>
. 在答案中模拟更容易。尽管exeuctionTerminatedLoopback
确实必须是(如您所说)将递归分解为两部分。因为它是一个主题,所以它应该保密,不能泄露出去。
如果不使用主题,我认为 C# 中没有任何答案。
推荐阅读
- r - 将数据从一个数据帧拉入另一个数据帧
- artificial-intelligence - 如何将音频文件编码为 base64 字符串以用于 Speech-To-Text API (https://speech.googleapis.com/v1/speech:recognize)
- c - 包含 char 数组的结构的 memcpy 失败
- javascript - 在地图数组的标签 Chart.js 中使用变量
- javascript - 如何动态加载javascript并将其缓存为html
- java - 比较方法
- angular - 为表单控件禁用属性删除数据
- python - Django - 创建电话审核应用程序模型的方向是否正确?
- r - 在 Shiny 中使用 reactiveFileReader() 时如何避免内存泄漏?
- spring - @Id 注释导致列表重复