首页 > 解决方案 > Reactive Observable.Create 为没有 Task.Run 的异步生产者

问题描述

我想要写函数,它将异步返回项目:

public static async Task<IEnumerable<T>> GetItems()
{
    while (await ShouldLoopAsync())
    {
        yield return await GetItemAsync();
    }
}

当然它不会起作用。在1中,他们提出了下一个解决方案:

return Observable.Create<T>(
    observer => Task.Run(async () =>
        {
            while (await ShouldLoopAsync())
            {
                observer.OnNext(await getAsync());
            }
            observer.OnCompleted();
        }
    )
);

我不喜欢1中接受的解决方案,因为它使用 Observable.Create 中的 Task.Run 方法。我想要一些具有反应性扩展的解决方案,它将在当前的 TaskScheduler 上运行,而不使用除现有线程之外的其他线程。我可以使用 .net 事件以声明方式编写这样的解决方案,但我不能使用 Rx。

更新

我试图删除Task.Run并编写下一个测试:

[TestClass]
public class ReactiveWithTaskIEnumerable
{
    [TestMethod]
    public void ReactibeShouldProcessValuesWithoutnakoplenie()
    {
        Trace.WriteLine($"Start Thread {Thread.CurrentThread.ManagedThreadId}");
        var i = GetIntsAsync()
            .Subscribe(
            onNext: p =>
            {
                Trace.WriteLine($"onNext Thread {Thread.CurrentThread.ManagedThreadId} {p}");
            },
            onCompleted: () =>
            {
                Trace.WriteLine($"onCompleted Thread {Thread.CurrentThread.ManagedThreadId}");
            },
            onError: p =>
            {
                Trace.WriteLine($"onError Thread {Thread.CurrentThread.ManagedThreadId} {p}");
                throw p;
            }
            );
        Trace.WriteLine($"Finish Thread {Thread.CurrentThread.ManagedThreadId}");

    }

    private IObservable<int> GetIntsAsync()
    {
        return Observable.Create<int>(
            observer => async () =>
            {
                Trace.WriteLine($"Produce {Thread.CurrentThread.ManagedThreadId}");
                try
                {
                    foreach (var i in Enumerable.Range(1, 10))
                    {
                        await Task.Delay(1000);
                        observer.OnNext(1);
                    }
                }
                catch (Exception e)
                {
                    observer.OnError(e);
                }
                observer.OnCompleted();
            });
    }
}

输出是:

Start Thread 3
Finish Thread 3

所以内部任务根本没有开始。

之后我改为GetIntsAsync

    private IObservable<int> GetIntsAsync()
    {
        return Observable.Create<int>(
            observer =>
            {
                async Task Lambda()
                {
                    Trace.WriteLine($"Produce {Thread.CurrentThread.ManagedThreadId}");
                    try
                    {
                        foreach (var i in Enumerable.Range(1, 10))
                        {
                            await Task.Delay(1000);
                            observer.OnNext(1);
                        }
                    }
                    catch (Exception e)
                    {
                        observer.OnError(e);
                    }
                    observer.OnCompleted();
                }

                return Lambda();
            });
    }

这给出了下一个输出:

Debug Trace:
Start Thread 3
Produce 3
Finish Thread 3

它接近输出Task.Run

调试跟踪:启动线程 3 生产 9 完成线程 3

现在我需要等到 observable 完成

标签: c#.netasynchronousasync-awaitsystem.reactive

解决方案


试试 NuGetting“System.Interactive”,然后试试这个:

public static IEnumerable<int> GetItems()
{
    return EnumerableEx.Create<int>(async y =>
    {
        while (await ShouldLoopAsync())
        {
            await y.Return(GetItemAsync());
        }
        await y.Break();
    });
}

推荐阅读