首页 > 解决方案 > 通过 Rx 正确使用多个资源

问题描述

我需要在 Rx 中使用多个一次性资源。这就是我嵌套Observable.Using语句的方式(内部源代码仅用于测试)。

var obs = Observable.Using(
    () => new FileStream("file.txt", FileMode.Open),
    fs => Observable.Using(
        () => new StreamReader(fs),
        sr => Observable.Create<string>(o =>
            TaskPoolScheduler.Default.ScheduleAsync(async (sch, ct) =>
            {
                while (!ct.IsCancellationRequested)
                {
                    var s = await sr.ReadLineAsync().ConfigureAwait(false);
                    if (s is null) break;

                    o.OnNext(s);
                }
                o.OnCompleted();
            }))));

obs.Subscribe(Console.WriteLine);

有没有更简洁的方式来use处理多个一次性资源?

标签: c#system.reactiverx.net

解决方案


我想不出use无限数量的资源的通用方法,但至少您可以为 2-3 个资源的常见情况制作辅助方法。这是两个的实现:

public static IObservable<TResult> Using<TResult, TResource1, TResource2>(
    Func<TResource1> resourceFactory1,
    Func<TResource1, TResource2> resourceFactory2,
    Func<TResource1, TResource2, IObservable<TResult>> observableFactory)
    where TResource1 : IDisposable
    where TResource2 : IDisposable
{
    return Observable.Using(resourceFactory1, resource1 => Observable.Using(
        () => resourceFactory2(resource1),
        resource2 => observableFactory(resource1, resource2)));
}

使用示例:

var obs = Using(
    () => new FileStream("file.txt", FileMode.Open),
    (fs) => new StreamReader(fs),
    (fs, sr) => Observable.Create<string>(o =>
        TaskPoolScheduler.Default.ScheduleAsync(async (sch, ct) =>
        {
            while (!ct.IsCancellationRequested)
            {
                var s = await sr.ReadLineAsync().ConfigureAwait(false);
                if (s is null) break;

                o.OnNext(s);
            }
            o.OnCompleted();
        })));

推荐阅读