首页 > 解决方案 > 需要有关基于内存拉取的队列库的建议

问题描述

我正在为 Tech Jobs 做准备,遇到了一个现在经常被问到的面试问题。我在 LeetCode 等网站上找到了一些代码片段,但大部分都是 Java 代码。我想知道是否在面试中提供了这个问题,有没有办法用 C# 设计/编写解决方案(诸如此类的东西PriorityQueue在这里不可用)?

以下是基本用例。

  1. 图书馆维护的多个队列

  2. 每个队列必须支持多个发布者和订阅者。

  3. 每个队列都有一个最大保留期限,超过该期限队列中的消息不应驻留在内存中。

  4. 队列中的每条消息都可以有一个可选的 TTL 值。任何具有过期 TTL 的消息都不应该被任何订阅者消费,也不应该驻留在内存中。

  5. 每个消费者都应该阅读所有的消息。

类似的东西(多个生产者/多个消费者)已经在这里Multiple Producers/Consumers之前发布过,但是 C# 在线没有什么可用的。

关于如何使用标准 .NET API 来为此设计解决方案的任何建议。

标签: c#multithreadingproducer-consumer

解决方案


Reactive Extensions library (Rx) 似乎是解决这个问题的好选择。它基于两个接口:IObservable<T>= 发布者和IObserver<T>= 订阅者,并提供了这些接口的许多实现和组合器。例如,该类ReplaySubject<T>实现了这两个接口,并且它有一个接受参数的构造函数,该参数TimeSpan window定义了重放缓冲区的最大时间长度。

您可以在下面看到如何实例化ReplaySubject<T>具有 30 秒的保留期、如何将项目排入队列、如何订阅将接收主题中当前所有项目的通知的消费者,以及将被排入队列的所有项目将来,以及如何结束订阅以停止接收通知:

using System.Reactive.Subjects;
//...
var subject = new ReplaySubject<Item>(TimeSpan.FromSeconds(30));
//...
subject.OnNext(new Item());
//...
var subscription = subject.Subscribe((Item x) => Console.WriteLine($"Received {x}"));
//...
subscription.Dispose();

至于“过期 TTL”功能,这似乎很棘手,因为 AFAIK 在 Rx 中没有提供此功能的内置组件。我考虑过将其实现为自定义ISubject<T>,基于嵌套主题的结构:ReplaySubject<ReplaySubject<T>>. 内部主题将包含单个值,并且将被实例化为window等于特定项目的 TTL 到期期限。Merge消费者将在与运营商合并后订阅此结构。这个想法源于这个问题:How can I clear the buffer on a ReplaySubject? 这是一个实现:

/// <summary>
/// Represents an object that is both an observable sequence as well as an observer.
/// Each notification is broadcasted to all subscribed and future observers, subject
/// to buffer trimming and notification expiration policies.
/// </summary>
public class ExpirableReplaySubject<T> : ISubject<T>
{
    private readonly TimeSpan _window;
    private readonly Func<T, TimeSpan> _expireAfterSelector;
    private readonly ReplaySubject<ISubject<T>> _replaySubject;
    private readonly IObservable<T> _replaySubjectMerged;

    public ExpirableReplaySubject(TimeSpan window,
        Func<T, TimeSpan> expireAfterSelector)
    {
        _window = window;
        _expireAfterSelector = expireAfterSelector;
        _replaySubject = new ReplaySubject<ISubject<T>>(window);
        _replaySubjectMerged = _replaySubject.Merge();
    }

    public void OnNext(T value)
    {
        var expireAfter = _expireAfterSelector(value);
        if (expireAfter > _window) expireAfter = _window;
        var inner = new ReplaySubject<T>(1, expireAfter); inner.OnNext(value);
        _replaySubject.OnNext(inner);
    }

    public void OnCompleted()
    {
        // All subjects, inner and outer, must be completed
        _replaySubject.OnCompleted();
        _replaySubject.Subscribe(subject => subject.OnCompleted());
    }

    public void OnError(Exception error)
    {
        // Faulting the outer subject is enough
        _replaySubject.OnError(error);
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return _replaySubjectMerged.Subscribe(observer);
    }
}

window参数配置主题的保留时间跨度,expireAfterSelector是一个选择器,针对收到的每个新项目运行,并返回此特定项目的到期时间。

这种方法的问题在于,当内部主体被逐出外部_replaySubject时,根据window策略,它们没有完成(它们的OnCompleted方法没有被调用)。如果不完成内部主题,它们将永远不会被取消订阅_replaySubjectMerged,从而导致永久性内存泄漏。所以需要做更多的工作。我们可以实现一个包含单个可过期值的轻量级,而不是使用ReplaySubject<T>带有bufferSize等于的 s1作为内部主题:ISubject<T>

private class ExpirableValueSubject<T> : ISubject<T>
{
    private T _value;
    private Timer _timer; // Becomes null when the subject is completed

    public ExpirableValueSubject(T value, TimeSpan expireAfter)
    {
        if (expireAfter <= TimeSpan.Zero) return; // Expired upon creation
        _value = value;
        _timer = new Timer(arg => ((ExpirableValueSubject<T>)arg).OnCompleted(),
            this, expireAfter, Timeout.InfiniteTimeSpan);
    }

    public void OnNext(T value) => throw new InvalidOperationException();
    public void OnError(Exception error) => throw new InvalidOperationException();
    public void OnCompleted()
    {
        lock (this) { _timer?.Dispose(); _timer = null; _value = default; }
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        lock (this)
        {
            if (_timer != null) observer.OnNext(_value);
            observer.OnCompleted();
        }
        return Disposable.Empty;
    }
}

此自定义主题OnCompleted在订阅时立即发出通知,并且其OnCompleted方法已被劫持以释放_value和处置_timer.

此类this用作储物柜对象,一般不建议这样做。在这种情况下,该类旨在作为内部组件,并且对该类实例的引用不会泄漏到外部世界,因此锁定this应该是可以的。

要完成我们的解决方案,您所要做的就是替换这一行:

var inner = new ReplaySubject<T>(1, expireAfter); inner.OnNext(value);

...有了这个:

var inner = new ExpirableValueSubject<T>(value, expireAfter);

通过此更改,ExpirableReplaySubject<T>该类应按预期工作,而不会泄漏内存。当然,它是线程安全的,因为所有ISubject<T>针对一般用途的实现都应该是。

使用示例:

var subject = new ExpirableReplaySubject<Item>(TimeSpan.FromSeconds(30),
    item => item.TTL);

注意:原始实现(修订版 1)使用封装BehaviorSubject<T>作为内部主题。这种方法是有问题的,因为 aBehaviorSubject<T>仅在处置时才释放其内部值,而不是在完成时。并且在处理它之后它变得无法使用(它抛出ObjectDisposedExceptions)。


推荐阅读