c# - Reactive Observable.Create 为没有 Task.Run 的异步生产者
问题描述
我想要写函数,它将异步返回项目:
public static async Task<IEnumerable<T>> GetItems()
{
while (await ShouldLoopAsync())
{
yield return await GetItemAsync();
}
}
当然它不会起作用。在1中,他们提出了下一个解决方案:
return Observable.Create<T>(
observer => Task.Run(async () =>
{
while (await ShouldLoopAsync())
{
observer.OnNext(await getAsync());
}
observer.OnCompleted();
}
)
);
我不喜欢1中接受的解决方案,因为它使用 Observable.Create 中的 Task.Run 方法。我想要一些具有反应性扩展的解决方案,它将在当前的 TaskScheduler 上运行,而不使用除现有线程之外的其他线程。我可以使用 .net 事件以声明方式编写这样的解决方案,但我不能使用 Rx。
更新
我试图删除Task.Run
并编写下一个测试:
[TestClass]
public class ReactiveWithTaskIEnumerable
{
[TestMethod]
public void ReactibeShouldProcessValuesWithoutnakoplenie()
{
Trace.WriteLine($"Start Thread {Thread.CurrentThread.ManagedThreadId}");
var i = GetIntsAsync()
.Subscribe(
onNext: p =>
{
Trace.WriteLine($"onNext Thread {Thread.CurrentThread.ManagedThreadId} {p}");
},
onCompleted: () =>
{
Trace.WriteLine($"onCompleted Thread {Thread.CurrentThread.ManagedThreadId}");
},
onError: p =>
{
Trace.WriteLine($"onError Thread {Thread.CurrentThread.ManagedThreadId} {p}");
throw p;
}
);
Trace.WriteLine($"Finish Thread {Thread.CurrentThread.ManagedThreadId}");
}
private IObservable<int> GetIntsAsync()
{
return Observable.Create<int>(
observer => async () =>
{
Trace.WriteLine($"Produce {Thread.CurrentThread.ManagedThreadId}");
try
{
foreach (var i in Enumerable.Range(1, 10))
{
await Task.Delay(1000);
observer.OnNext(1);
}
}
catch (Exception e)
{
observer.OnError(e);
}
observer.OnCompleted();
});
}
}
输出是:
Start Thread 3
Finish Thread 3
所以内部任务根本没有开始。
之后我改为GetIntsAsync
:
private IObservable<int> GetIntsAsync()
{
return Observable.Create<int>(
observer =>
{
async Task Lambda()
{
Trace.WriteLine($"Produce {Thread.CurrentThread.ManagedThreadId}");
try
{
foreach (var i in Enumerable.Range(1, 10))
{
await Task.Delay(1000);
observer.OnNext(1);
}
}
catch (Exception e)
{
observer.OnError(e);
}
observer.OnCompleted();
}
return Lambda();
});
}
这给出了下一个输出:
Debug Trace:
Start Thread 3
Produce 3
Finish Thread 3
它接近输出Task.Run
:
调试跟踪:启动线程 3 生产 9 完成线程 3
现在我需要等到 observable 完成
解决方案
试试 NuGetting“System.Interactive”,然后试试这个:
public static IEnumerable<int> GetItems()
{
return EnumerableEx.Create<int>(async y =>
{
while (await ShouldLoopAsync())
{
await y.Return(GetItemAsync());
}
await y.Break();
});
}
推荐阅读
- reactjs - Hide / Show Navbar ReactJS
- vba - VBA将单元格值与列的所有值进行比较
- c++ - 如何将 SHFileOperation() 与 CString 路径一起使用
- swift - Swift 闭包异常处理
- javascript - 如何访问复选框值并将其传递给ajax
- html - Chrome DevTools 的网络选项卡中大量没有活动
- c++ - 以下代码如何每次都为唯一的调用堆栈唯一地实例化模板函数?
- html - 在我的横幅中的两个按钮之间创建一个空间
- ios - 如何将 cookie 字符串从 allHTTPHeaderFields 转换为 HTTPCookie
- azure-ad-b2c - Azure AD B2C 从自定义 UI 模板中去除 html 标记