首页 > 解决方案 > 在 C# 中可通过背压观察

问题描述

C# rx 中有没有办法处理背压?我正在尝试从分页查询的结果中调用 Web api。这个 web api 非常脆弱,我需要不超过 3 个并发调用,所以,程序应该是这样的:

  1. 从 db 获取一页
  2. 调用 web api,页面上每条记录最多三个并发调用
  3. 将结果保存回 db
  4. 获取另一个页面并重复,直到没有更多结果。

我并没有真正得到我想要的序列,基本上数据库会获取所有记录,无论它们是否可以被处理。

我尝试了很多方法,包括调整ObserveOn操作符、实现信号量以及其他一些东西。我可以得到一些指导来实施这样的事情吗?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
using Castle.Core.Internal;
using Xunit;
using Xunit.Abstractions;

namespace ProductValidation.CLI.Tests.Services
{
    public class Example
    {
        private readonly ITestOutputHelper output;

        public Example(ITestOutputHelper output)
        {
            this.output = output;
        }

        [Fact]
        public async Task RunsObservableToCompletion()
        {
            var repo = new Repository(output);
            var client = new ServiceClient(output);

            var results = repo.FetchRecords()
                .Select(x => client.FetchMoreInformation(x).ToObservable())
                .Merge(1)
                .Do(async x => await repo.Save(x));

            await results.LastOrDefaultAsync();
        } 
    }

    public class Repository
    {
        private readonly ITestOutputHelper output;

        public Repository(ITestOutputHelper output)
        {
            this.output = output;
        }

        public IObservable<int> FetchRecords()
        {
            return Observable.Create<int>(async (observer) =>
            {
                var page = 1;
                var products = await FetchPage(page);
                while (!products.IsNullOrEmpty())
                {
                    foreach (var product in products)
                    {
                        observer.OnNext(product);
                    }

                    page += 1;
                    products = await FetchPage(page);
                }
                observer.OnCompleted();
            })
            .ObserveOn(SynchronizationContext.Current);
        }

        private async Task<IEnumerable<int>> FetchPage(int page)
        {
            // Simulate fetching a paged query.
            await Task.Delay(500).ToObservable().ObserveOn(new TaskPoolScheduler(new TaskFactory()));
            output.WriteLine("Fetching page {0}", page);
            if (page >= 4) return Enumerable.Empty<int>();
            return Enumerable.Range(1, 3).Select(_ => page);
        }

        public async Task Save(string id)
        {
            await Task.Delay(50); //Simulates latency
        }
    }

    public class ServiceClient
    {
        private readonly ITestOutputHelper output;
        private readonly SemaphoreSlim semaphore;

        public ServiceClient(ITestOutputHelper output)
        {
            this.output = output;
            this.semaphore = new SemaphoreSlim(2);
        }

        public async Task<string> FetchMoreInformation(int id)
        {
            try
            {
                output.WriteLine("Calling the web client for {0}", id);
                await semaphore.WaitAsync(); // Protection for the webapi not sending too many calls
                await Task.Delay(1000); //Simulates latency
                return id.ToString();
            }
            finally
            {
                semaphore.Release();
            }
        }
    }
}

标签: c#system.reactive

解决方案


Rx 不支持背压,因此无法以与处理记录相同的速度从数据库中获取记录。也许您可以使用 aSubject<Unit>作为信号机制,每次处理记录时推送一个值,并设计一种方法在生产站点使用这些信号,以便在收到信号时从数据库中获取新记录。但这将是一个混乱且惯用的解决方案。TPL 数据流是比 Rx 更适合进行此类工作的工具。它本机支持BoundedCapacity配置选项。

关于您发布的代码的一些评论,与背压问题没有直接关系:

Merge带有参数的运算符maxConcurrent对内部序列的并发订阅施加了限制,但是如果内部序列已经启动并运行,这将不起作用。所以你必须确保内部序列是冷的,一个方便的方法是Defer操作符:

.Select(x => Observable.Defer(() =>
    client.FetchMoreInformation(x).ToObservable()))

将异步方法转换为延迟的可观察序列的一种更常见的方法是FromAsync运算符:

.Select(x => Observable.FromAsync(() => client.FetchMoreInformation(x)))

顺便说一句,Do运营商不理解异步委托,所以不是:

.Do(async x => await repo.Save(x));

...创建async voidlambdas,最好这样做:

.Select(x => Observable.FromAsync(() => repo.Save(x)))
.Merge(1);

更新:这是一个示例,说明如何使用 aSemaphoreSlim在 Rx 中实现背压:

const int boundedCapacity = 10;
using var semaphore = new SemaphoreSlim(boundedCapacity, boundedCapacity);

IObservable<int> results = repo
    .FetchRecords(semaphore)
    .Select(x => Observable.FromAsync(() => client.FetchMoreInformation(x)))
    .Merge(1)
    .Select(x => Observable.FromAsync(() => repo.Save(x)))
    .Merge(1)
    .Do(_ => semaphore.Release());

await results.DefaultIfEmpty();

FetchRecords方法内部:

//...
await semaphore.WaitAsync();
observer.OnNext(product);
//...

这是一个脆弱的解决方案,因为它依赖于通过管道传播所有元素。WaitAsync如果将来您决定在管道中包括过滤或节流,那么将违反和之间的一对一关系Release,最可能的结果是管道死锁。


推荐阅读