streaming - 使用 ReactPHP 响应流中的时间睡眠
问题描述
我正在玩 ReactPHP 和响应流。
我已经成功创建了一个 POC 来生成这样的响应流:
function (int $chunks, int $sleep) use ($loop) {
$stream = new ThroughStream();
$loop->addPeriodicTimer($sleep, function (TimerInterface $timer) use ($stream, $chunks, $loop) {
static $i = 0;
$stream->write(microtime(true) . PHP_EOL);
$i++;
if ($i >= $chunks) {
$loop->cancelTimer($timer);
$stream->end();
}
});
return new Response(200, ['Content-Type' => 'text/plain'], $stream);
}
调用curl -X GET -i https://localhost:9091/sleepstream/10/1
将产生
1624222419.1271
1624222420.1282
1624222421.1293
1624222422.1302
1624222423.1312
1624222424.1323
1624222425.1333
1624222426.1342
1624222427.1353
1624222428.1363
每行打印在前一行之后 1 秒。好的。
现在我正在尝试创建一个更逼真的控制器:
function (int $sleep, ServerRequestInterface $request) use ($loop) {
$body = $request->getBody();
assert($body instanceof \React\Stream\ReadableStreamInterface);
$in = new \Clue\React\NDJson\Decoder($body);
$stream = new ThroughStream();
$in->on('data', function ($data) use ($stream, $loop) {
$data->ts = time();
$loop->futureTick(function () use ($stream, $data) {
echo "DATA\n";
$stream->write(\json_encode($data) . PHP_EOL);
sleep(1);
});
});
$in->on('end', function() use ($stream, $loop) {
$loop->addTimer(2, function () use ($stream) {
$stream->end();
});
});
return new Response(200, ['Content-Type' => 'text/plain'], $stream);
}
我正在使用 NDJSON 输入文件users.ndjson
:
{"id":1,"name":"Alice"}
{"id":2,"name":"Bob"}
{"id":3,"name":"Carol"}
{"id":4,"name":"David"}
{"id":5,"name":"Zach"}
所以这样称呼:
curl -D /dev/stderr -s -X PUT -T contrib/users.ndjson https://localhost:9091/bridge/ndjson/1
给出了:
{"id":1,"name":"Alice","ts":1624231062}
{"id":2,"name":"Bob","ts":1624231062}
{"id":3,"name":"Carol","ts":1624231062}
{"id":4,"name":"David","ts":1624231062}
{"id":5,"name":"Zach","ts":1624231062}
但是响应不是每秒逐行收到,而是一次性收到...
服务器日志显示:
DATA
DATA
DATA
DATA
DATA
时机很好(每秒 1 次)。
我不明白为什么第二个控制器中的响应流被破坏了。
编辑:我开始理解为什么响应流在第二个控制器中被破坏了。
@WyriHaximus 指出sleep()
正在阻塞循环。所以我现在的问题是:我如何才能让记录准备好流式传输并模拟睡眠的循环(我试图模拟输入流和输出流之间的计算滞后以验证某个关键点)。
解决方案
ReactPHP 核心维护者在这里。sleep(1);
你在你的代码中阻止了事件循环一整秒。并且由于事件循环无法将数据写出,因为它在尝试时被阻塞。此外,如果添加time()
调试调用,您可能希望microtime(true)
更好地可视化写入队列之间的时间。除非我遗漏了某些东西并且您有充分的理由,否则您通常不想延迟写出处理后获得的数据,因为它会占用您不需要保留的内存。
推荐阅读
- ios - AppDelegate 在启动时未触发?
- xcode - IOS 12 未找到此可执行文件的有效配置文件,但仍在 IOS 11、xcode 10 中启动
- oracle - 在oracle中跟踪迁移的表之间丢失的记录
- typescript - 如何从 Aurelia Store 订阅方法导入 Typescript 订阅类型?
- web-services - 如何获取服务器使用 curl 处理发布请求所需的时间?
- mysql - Azure Database for MySQL 和 Azure VM 复制连接错误
- php - 快速邮件中的 DKIM 签名
- node.js - 我的变量不会使用 node.js 填充到 for 循环中
- r - Match() 函数只对 0 运行,对其他值不运行
- c++ - 通过引用传递参数时我不明白这个错误