首页 > 解决方案 > OSGi PushStream 很慢

问题描述

在尝试 OSGi PushStream 库时,我觉得它真的很慢。我创建了两种方法,一种使用 PushStream 做同样的事情,另一种使用简单的 BlockingQueue(见下面的代码),结果如下:

Queue needs 3 milliseconds to process 1000 events.
PushStream needs 31331 milliseconds to process 1000 events.

为什么 PushStream 较慢?我做错了什么?

代码

使用 PushStream:

public class TestPush{

    @Test
    public void testPushStream() throws Exception {
        final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());

        final PushStreamProvider psp = new PushStreamProvider();
        final SimplePushEventSource<Integer> source =
              psp.buildSimpleEventSource(Integer.class).withQueuePolicy(QueuePolicyOption.BLOCK).build();

        final Deferred<Instant> startD = pf.deferred();
        final Deferred<Instant> endD = pf.deferred();

        psp.createStream(source).onClose(() -> endD.resolve( Instant.now()) ).forEach((i) -> {
            if (i == 0) {
                startD.resolve( Instant.now() );
            }
        });

        final Promise<Long> nbEvent = psp.createStream(source).count();

        for (int i = 0; i < 1000; i++) {
            source.publish(i);
        }
        source.endOfStream();

        System.out.println("PushStream needs "
        + Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
        + " milliseconds to process " + nbEvent.getValue() + " events.");
    }

使用 ArrayBlockingQueue:

    @Test
    public void testBlockingQueue() throws Exception {
        final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());

        final Executor e = Executors.newFixedThreadPool(1);
        final ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<>(32);

        final Deferred<Instant> startD = pf.deferred();
        final Deferred<Instant> endD = pf.deferred();
        final Deferred<Integer> nbEvent = pf.deferred();

        e.execute( () -> {
            try {
                Integer i = 0;
                Integer last = 0;
                do {
                    i = abq.take();

                    if (i == 0) {
                        startD.resolve(Instant.now());
                    } else if (i != -1) {
                        last = i;
                    }
                }
                while (i != -1);
                endD.resolve(Instant.now());
                nbEvent.resolve(last + 1);
            }
            catch (final InterruptedException exception) {
                exception.printStackTrace();
            }
        });

        for (int i = 0; i < 1000; i++) {
            abq.put(i);
        }
        abq.put(-1);

        System.out.println("Queue needs "
        + Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
        + " milliseconds to process " + nbEvent.getPromise().getValue() + " events.");
    }
}

标签: asynchronouspromisestreamosgi

解决方案


这是一个有趣的问题:)

为什么 PushStream 较慢?我做错了什么?

感谢您不只是假设 PushStream 实现很糟糕。在这种情况下,它会变慢,因为(可能没有意识到)你要求它!

第 1 部分 - 缓冲

默认情况下,PushStreams 被缓冲。这意味着它们包含一个队列,在处理事件之前将事件放入该队列。因此,缓冲会做一些对吞吐量速度产生负面影响的事情。

  1. 它在管道中添加了一个额外的队列/出队步骤
  2. 它在事件处理中增加了一个额外的线程切换
  3. 缓冲区的默认策略是返回与缓冲区充满程度相关的背压。

在这种情况下,绝大多数减速是因为背压。当您使用psp.createStream(source)它创建一个流时,会设置一个包含 32 个元素的缓冲区和一个基于缓冲区大小的线性背压策略,满时返回一秒,当其中有一个项目时返回 31 毫秒。值得注意的是,每个元素 31 毫秒加起来就是 30 秒!

重要的是,SimplePushEventSource 始终尊重来自添加到它的消费者的背压请求。这意味着您可能会尽可能快地将事件泵入 SimplePushEventSource,但它们只会以管道请求的速度交付。

如果我们从您正在创建的推送流中删除缓冲,那么我们会得到以下测试:

@Test
public void testPushStream2() throws Exception {
    final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());

    final PushStreamProvider psp = new PushStreamProvider();
    final SimplePushEventSource<Integer> source =
          psp.buildSimpleEventSource(Integer.class)
          .withQueuePolicy(QueuePolicyOption.BLOCK)
          .build();

    final Deferred<Instant> startD = pf.deferred();
    final Deferred<Instant> endD = pf.deferred();

    psp.buildStream(source).unbuffered().build().onClose(() -> endD.resolve( Instant.now()) ).forEach((i) -> {
        if (i == 0) {
            startD.resolve( Instant.now() );
        }
    });

    final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build().count();

    for (int i = 0; i < 1000; i++) {
        source.publish(i);
    }
    source.endOfStream();

    System.out.println("PushStream needs "
    + Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
    + " milliseconds to process " + nbEvent.getValue() + " events.");
}

运行这个(在我的机器上)的结果是:

PushStream needs 39 milliseconds to process 1000 events.

这显然更接近您的预期,但仍然明显慢得多。请注意,我们可能仍然有一些缓冲,但调整了 PushbackPolicy。这会给我们带来更快的吞吐量,但没有这个速度那么快。

第 2 部分 - 管道长度

接下来要注意的是您正在使用onClose()处理程序。这为您的推送流管道增加了一个额外的阶段。实际上,您可以将 onClose 移动为承诺的结果,从而减少管道的长度(您只需运行一次)。

@Test
public void testPushStream3() throws Exception {
    final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());

    final PushStreamProvider psp = new PushStreamProvider();
    final SimplePushEventSource<Integer> source =
            psp.buildSimpleEventSource(Integer.class)
            .withQueuePolicy(QueuePolicyOption.BLOCK)
            .build();

    final Deferred<Instant> startD = pf.deferred();
    final Deferred<Instant> endD = pf.deferred();

    psp.buildStream(source).unbuffered().build().forEach((i) -> {
        if (i == 0) {
            startD.resolve( Instant.now() );
        }
    });

    final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build().count()
            .onResolve(() -> endD.resolve( Instant.now()));

    for (int i = 0; i < 1000; i++) {
        source.publish(i);
    }
    source.endOfStream();

    System.out.println("PushStream needs "
            + Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
            + " milliseconds to process " + nbEvent.getValue() + " events.");
}

这个版本的结果(在我的机器上)是:

PushStream needs 21 milliseconds to process 1000 events.

第 3 部分 - 多路传输

“原始数组阻塞队列”示例和 PushStream 示例之间的主要区别在于您实际上创建了两个PushStream。第一个负责捕获开始时间,第二个负责计算事件。这迫使 SimplePushEventSource 将事件多路复用到多个消费者。

如果我们将行为折叠到单个管道中,以便 SimplePushEventSource 可以使用快速路径传递怎么办?

@Test
public void testPushStream4() throws Exception {
    final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());

    final PushStreamProvider psp = new PushStreamProvider();
    final SimplePushEventSource<Integer> source =
            psp.buildSimpleEventSource(Integer.class)
            .withQueuePolicy(QueuePolicyOption.BLOCK)
            .build();

    final Deferred<Instant> startD = pf.deferred();
    final Deferred<Instant> endD = pf.deferred();

    final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build()
            .filter(i -> {
                if (i == 0) {
                    startD.resolve( Instant.now() );
                }
                return true;
            })
            .count()
            .onResolve(() -> endD.resolve( Instant.now()));

    for (int i = 0; i < 1000; i++) {
        source.publish(i);
    }
    source.endOfStream();

    System.out.println("PushStream needs "
            + Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
            + " milliseconds to process " + nbEvent.getValue() + " events.");
}

这个版本的结果(在我的机器上)是:

PushStream needs 3 milliseconds to process 1000 events.

概括

PushStreams 是一种使用异步到达事件的快速、有效的方法,但了解哪些缓冲行为适合您的应用程序非常重要。如果您想要快速迭代一大块数据,那么您需要小心如何设置,因为缓冲默认值是为不同的用例设计的!


推荐阅读