首页 > 解决方案 > 长期订阅 Rx 主题导致巨大的内存泄漏

问题描述

我已经实现了可能是一个非常幼稚的内存扇出队列机制,如下所示:

public class ObservableQueue<T> : IObservableQueue<T>
{
    private readonly Subject<T> queue;

    public IObservable<T> Messages => queue.AsObservable();

    public ObservableQueue()
    {
        queue = new Subject<T>();
    }

    public void Enqueue(T item)
    {
        queue.OnNext(item);
    }

    public void Enqueue(List<T> items)
    {
        items.ForEach(queue.OnNext);
    }
}

我选择这个实现的原因是它允许非常有表现力的订阅,我非常喜欢这样的订阅:

subscription = queue.Messages
    .Select(data => data.ToJson())
    .Buffer(TimeSpan.FromSeconds(10), ByteSize.FromBytes(128))
    .Where(Enumerable.Any)
    .Select(ToQuery)
    .Subscribe(query => db.Execute(query));

这里使用的Buffer方法再次是我的一个有点幼稚的实现:

public static IObservable<IList<string>> Buffer(this IObservable<string> source, TimeSpan timeSpan, ByteSize size) 
{
    // Completes when the `timespan` has elapsed
    var timer = Observable.Timer(timeSpan).Select(_ => new Unit());
    // Completes when the ByteSize exceeds `size`
    var bytes = source
        .Scan(ByteSize.FromBytes(0),
                    (a, b) => a + ByteSize.FromBytes(Encoding.Unicode.GetByteCount(b)))
        .SkipWhile(a => a < size)
        .Select(_ => new Unit()); // We only want to use these for notification, and both observables
                                  // need to be of the same type, so we just emit Unit

    // Amb races the two observables to see which one finishes first, which then propagates the notification
    // and signals the source to strop buffering
    return source.Buffer(() => Observable.Amb(timer, bytes));
}

现在,一旦我在第二个代码块中创建订阅,内存使用量就会出现绝对爆炸式增长,罪魁祸首Subject<T>ObservableQueue<T>. 我应该强调的是,在这一点上,没有实际数据被Enqueued。已subscription创建并且仍在等待任何数据实际处理。

我可以在这里看到一些潜在的罪魁祸首:

然而,我无法在这里查明真正的根本原因。有任何想法吗?

注意:我认为很明显我不太熟悉,System.Reactive所以如果我在这里写了一些愚蠢的东西,我深表歉意。

标签: c#queuesystem.reactive

解决方案


推荐阅读