asynchronous - OSGi PushStream 很慢
问题描述
在尝试 OSGi PushStream 库时,我觉得它真的很慢。我创建了两种方法,一种使用 PushStream 做同样的事情,另一种使用简单的 BlockingQueue(见下面的代码),结果如下:
Queue needs 3 milliseconds to process 1000 events.
PushStream needs 31331 milliseconds to process 1000 events.
为什么 PushStream 较慢?我做错了什么?
代码
使用 PushStream:
public class TestPush{
@Test
public void testPushStream() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Integer> source =
psp.buildSimpleEventSource(Integer.class).withQueuePolicy(QueuePolicyOption.BLOCK).build();
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
psp.createStream(source).onClose(() -> endD.resolve( Instant.now()) ).forEach((i) -> {
if (i == 0) {
startD.resolve( Instant.now() );
}
});
final Promise<Long> nbEvent = psp.createStream(source).count();
for (int i = 0; i < 1000; i++) {
source.publish(i);
}
source.endOfStream();
System.out.println("PushStream needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getValue() + " events.");
}
使用 ArrayBlockingQueue:
@Test
public void testBlockingQueue() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final Executor e = Executors.newFixedThreadPool(1);
final ArrayBlockingQueue<Integer> abq = new ArrayBlockingQueue<>(32);
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
final Deferred<Integer> nbEvent = pf.deferred();
e.execute( () -> {
try {
Integer i = 0;
Integer last = 0;
do {
i = abq.take();
if (i == 0) {
startD.resolve(Instant.now());
} else if (i != -1) {
last = i;
}
}
while (i != -1);
endD.resolve(Instant.now());
nbEvent.resolve(last + 1);
}
catch (final InterruptedException exception) {
exception.printStackTrace();
}
});
for (int i = 0; i < 1000; i++) {
abq.put(i);
}
abq.put(-1);
System.out.println("Queue needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getPromise().getValue() + " events.");
}
}
解决方案
这是一个有趣的问题:)
为什么 PushStream 较慢?我做错了什么?
感谢您不只是假设 PushStream 实现很糟糕。在这种情况下,它会变慢,因为(可能没有意识到)你要求它!
第 1 部分 - 缓冲
默认情况下,PushStreams 被缓冲。这意味着它们包含一个队列,在处理事件之前将事件放入该队列。因此,缓冲会做一些对吞吐量速度产生负面影响的事情。
- 它在管道中添加了一个额外的队列/出队步骤
- 它在事件处理中增加了一个额外的线程切换
- 缓冲区的默认策略是返回与缓冲区充满程度相关的背压。
在这种情况下,绝大多数减速是因为背压。当您使用psp.createStream(source)
它创建一个流时,会设置一个包含 32 个元素的缓冲区和一个基于缓冲区大小的线性背压策略,满时返回一秒,当其中有一个项目时返回 31 毫秒。值得注意的是,每个元素 31 毫秒加起来就是 30 秒!
重要的是,SimplePushEventSource 始终尊重来自添加到它的消费者的背压请求。这意味着您可能会尽可能快地将事件泵入 SimplePushEventSource,但它们只会以管道请求的速度交付。
如果我们从您正在创建的推送流中删除缓冲,那么我们会得到以下测试:
@Test
public void testPushStream2() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Integer> source =
psp.buildSimpleEventSource(Integer.class)
.withQueuePolicy(QueuePolicyOption.BLOCK)
.build();
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
psp.buildStream(source).unbuffered().build().onClose(() -> endD.resolve( Instant.now()) ).forEach((i) -> {
if (i == 0) {
startD.resolve( Instant.now() );
}
});
final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build().count();
for (int i = 0; i < 1000; i++) {
source.publish(i);
}
source.endOfStream();
System.out.println("PushStream needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getValue() + " events.");
}
运行这个(在我的机器上)的结果是:
PushStream needs 39 milliseconds to process 1000 events.
这显然更接近您的预期,但仍然明显慢得多。请注意,我们可能仍然有一些缓冲,但调整了 PushbackPolicy。这会给我们带来更快的吞吐量,但没有这个速度那么快。
第 2 部分 - 管道长度
接下来要注意的是您正在使用onClose()
处理程序。这为您的推送流管道增加了一个额外的阶段。实际上,您可以将 onClose 移动为承诺的结果,从而减少管道的长度(您只需运行一次)。
@Test
public void testPushStream3() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Integer> source =
psp.buildSimpleEventSource(Integer.class)
.withQueuePolicy(QueuePolicyOption.BLOCK)
.build();
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
psp.buildStream(source).unbuffered().build().forEach((i) -> {
if (i == 0) {
startD.resolve( Instant.now() );
}
});
final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build().count()
.onResolve(() -> endD.resolve( Instant.now()));
for (int i = 0; i < 1000; i++) {
source.publish(i);
}
source.endOfStream();
System.out.println("PushStream needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getValue() + " events.");
}
这个版本的结果(在我的机器上)是:
PushStream needs 21 milliseconds to process 1000 events.
第 3 部分 - 多路传输
“原始数组阻塞队列”示例和 PushStream 示例之间的主要区别在于您实际上创建了两个PushStream。第一个负责捕获开始时间,第二个负责计算事件。这迫使 SimplePushEventSource 将事件多路复用到多个消费者。
如果我们将行为折叠到单个管道中,以便 SimplePushEventSource 可以使用快速路径传递怎么办?
@Test
public void testPushStream4() throws Exception {
final PromiseFactory pf = new PromiseFactory(PromiseFactory.inlineExecutor());
final PushStreamProvider psp = new PushStreamProvider();
final SimplePushEventSource<Integer> source =
psp.buildSimpleEventSource(Integer.class)
.withQueuePolicy(QueuePolicyOption.BLOCK)
.build();
final Deferred<Instant> startD = pf.deferred();
final Deferred<Instant> endD = pf.deferred();
final Promise<Long> nbEvent = psp.buildStream(source).unbuffered().build()
.filter(i -> {
if (i == 0) {
startD.resolve( Instant.now() );
}
return true;
})
.count()
.onResolve(() -> endD.resolve( Instant.now()));
for (int i = 0; i < 1000; i++) {
source.publish(i);
}
source.endOfStream();
System.out.println("PushStream needs "
+ Duration.between( startD.getPromise().getValue(), endD.getPromise().getValue() ).toMillis()
+ " milliseconds to process " + nbEvent.getValue() + " events.");
}
这个版本的结果(在我的机器上)是:
PushStream needs 3 milliseconds to process 1000 events.
概括
PushStreams 是一种使用异步到达事件的快速、有效的方法,但了解哪些缓冲行为适合您的应用程序非常重要。如果您想要快速迭代一大块数据,那么您需要小心如何设置,因为缓冲默认值是为不同的用例设计的!
推荐阅读
- javascript - 如何从 JavaScript 源映射中获取键值?
- python - pythonnet clr 如何添加对 dll 程序集的引用?
- viewmodel - 在共享视图模型中杀死片段观察实时数据实例
- mysql - 为什么我的查询只搜索以 a 开头和结尾的名称而忽略以 m 开头和结尾的名称?
- html - 部署 Web 应用程序时无法从文件夹内访问图像
- django - Django 的 http.response 的 _container 属性是什么?
- database - Apache Cassandra:auto_bootstrap 属性是否允许新(非种子)节点从另一个 DC 中的节点流式传输数据?
- javascript - 如何在wordpress中将外部js文件输出为内联脚本?
- image - 比PNG更好的图像格式?(页面速度)
- django - django 有一些内置的模型吸气剂功能吗?我“失去”了一个功能