首页 > 解决方案 > 使用 ReactPHP 响应流中的时间睡眠

问题描述

我正在玩 ReactPHP 和响应流。

我已经成功创建了一个 POC 来生成这样的响应流:

function (int $chunks, int $sleep) use ($loop) {
    $stream = new ThroughStream();
    $loop->addPeriodicTimer($sleep, function (TimerInterface $timer) use ($stream, $chunks, $loop) {
        static $i = 0;
        $stream->write(microtime(true) . PHP_EOL);
        $i++;
        if ($i >= $chunks) {
            $loop->cancelTimer($timer);
            $stream->end();
        }
    });
    return new Response(200, ['Content-Type' => 'text/plain'], $stream);
}

调用curl -X GET -i https://localhost:9091/sleepstream/10/1将产生

1624222419.1271
1624222420.1282
1624222421.1293
1624222422.1302
1624222423.1312
1624222424.1323
1624222425.1333
1624222426.1342
1624222427.1353
1624222428.1363

每行打印在前一行之后 1 秒。好的。

现在我正在尝试创建一个更逼真的控制器:

function (int $sleep, ServerRequestInterface $request) use ($loop) {
    $body = $request->getBody();
    assert($body instanceof \React\Stream\ReadableStreamInterface);
                    
    $in = new \Clue\React\NDJson\Decoder($body);
                    
    $stream = new ThroughStream();

    $in->on('data', function ($data) use ($stream, $loop) {
        $data->ts = time();
        $loop->futureTick(function () use ($stream, $data) {
            echo "DATA\n";
            $stream->write(\json_encode($data) . PHP_EOL);
            sleep(1);
        });
    });
                    
    $in->on('end', function() use ($stream, $loop) {
        $loop->addTimer(2, function () use ($stream) {
            $stream->end();
        });
    });
    return new Response(200, ['Content-Type' => 'text/plain'], $stream);
}

我正在使用 NDJSON 输入文件users.ndjson

{"id":1,"name":"Alice"}
{"id":2,"name":"Bob"}
{"id":3,"name":"Carol"}
{"id":4,"name":"David"}
{"id":5,"name":"Zach"}

所以这样称呼:

curl -D /dev/stderr -s -X PUT -T contrib/users.ndjson https://localhost:9091/bridge/ndjson/1

给出了:

{"id":1,"name":"Alice","ts":1624231062}
{"id":2,"name":"Bob","ts":1624231062}
{"id":3,"name":"Carol","ts":1624231062}
{"id":4,"name":"David","ts":1624231062}
{"id":5,"name":"Zach","ts":1624231062}

但是响应不是每秒逐行收到,而是一次性收到...

服务器日志显示:

DATA
DATA
DATA
DATA
DATA

时机很好(每秒 1 次)。

我不明白为什么第二个控制器中的响应流被破坏了。

编辑:我开始理解为什么响应流在第二个控制器中被破坏了。

@WyriHaximus 指出sleep()正在阻塞循环。所以我现在的问题是:我如何才能让记录准备好流式传输并模拟睡眠的循环(我试图模拟输入流和输出流之间的计算滞后以验证某个关键点)。

标签: streamingreactphp

解决方案


ReactPHP 核心维护者在这里。sleep(1);你在你的代码中阻止了事件循环一整秒。并且由于事件循环无法将数据写出,因为它在尝试时被阻塞。此外,如果添加time()调试调用,您可能希望microtime(true)更好地可视化写入队列之间的时间。除非我遗漏了某些东西并且您有充分的理由,否则您通常不想延迟写出处理后获得的数据,因为它会占用您不需要保留的内存。


推荐阅读