首页 > 解决方案 > 在 Reactive Stream 上发布时出现 IndexOutOfRangeException

问题描述

我正在开发一个应用程序,它使用可连接的 observable 来发布结果。这些结果被两个不同线程上的两个观察者消费;一个 observable 在 UI 线程上运行,并在表格和图表中显示结果,而另一个 observable 在默认线程池上运行,将结果发送到 REST 端点。

但是,在某些情况下,在可变时间后IndexOutOfRangeException抛出 an。我怀疑这是因为将结果发送到 REST Web 服务的线程存在并发问题。如果我不启动这个线程,结果显示得很好。此外,发生的不规则性是并发问题的典型特征。我附上了异常的堆栈跟踪。

at System.Collections.Generic.List`1.Add(T item)
at System.Reactive.Linq.ObservableImpl.Buffer`1.CountExact.ExactSink.OnNext(TSource value)
at System.Reactive.Subjects.Subject`1.OnNext(T value)
at System.Reactive.Sink`1.ForwardOnNext(TTarget value)
at System.Reactive.IdentitySink`1.OnNext(T value)
at System.Reactive.AutoDetachObserver`1.OnNextCore(T value)
at System.Reactive.ObserverBase`1.OnNext(T value)
at Wetr.Simulator.viewModel.SimulationViewModel.OnSimulationResultPublished(SimulationResult measurement) in E:\programming\csharp\Weatr\Wetr.Simulator\Wetr.Simulator\viewModel\SimulationViewModel.cs:Line 96.

发布结果的代码如下所示。

private void OnSimulationResultPublished(SimulationResult measurement)
{
    this.aggregatedSimulationResultObserver?.OnNext(measurement);
}

测量编写器使用以下代码订阅事件。

this.measurementStreamSubscription = measurementStream
    .Buffer(BUFFER_SIZE)
    .Subscribe(
        results => this.writer.AddMeasurementsAsync(
            results
                .Where(container => container != null)
                .Select(container => container.Result)
        )
    );

UI 更新由如下所示的订阅处理。

this.uiUpdateSubscription =
    this.AggregatedSimulationResults
        //.Select(result => new SimulationResult(
        //    result.Station, result.MeasurementSequenceID,
        //    new Measurement(
        //        result.Result.Station,
        //        result.Result.MeasurementType,
        //        new DateTime(result.Result.Timestamp.Ticks),
        //        result.Result.MeasuredValue
        //    )
        //))
        .ObserveOn(DispatcherScheduler.Current)
        .Subscribe(this.DisplayNewSimulationResults);

我已经通读了Intro to RX,但我对 Reactive Extensions 还是很陌生。我还尝试在缓冲测量值之前创建新的测量值副本,但这并没有解决问题。因此,我感谢任何帮助。

标签: c#multithreadingconcurrencyreactive-programmingsystem.reactive

解决方案


推荐阅读