首页 > 解决方案 > 如何从 Symfony 项目中运行 bin/console messenger:consume 命令?

问题描述

我在非 Symfony 项目中使用Messenger 组件,并在 DSN 传输中使用 Doctrine。现在我想测试我的代码并在本地机器上使用消息,但我不知道如何在控制台中运行 messenger 命令。

我尝试在控制台中使用Symfony\Component\Console\Application和注册该\Symfony\Component\Messenger\Command\ConsumeMessagesCommand命令,但是有很多嵌套的依赖项。

你有什么主意吗?

标签: phpconsumersymfony-consolesymfony-messenger

解决方案


我们实际上在许多项目中都这样做了,甚至是 WordPress CLI 工具,我们使用这个完成它以及传输。它不需要 Symfony,并且可以与大多数遵循通用标准的队列系统一起使用。

一般的想法是你想要一些东西(可能是一个单例)返回一个实例Interop\Queue\Context,这就是我们使用的:

    function createContext(): \Interop\Queue\Context
    {
        $factory = new \Enqueue\Dbal\DbalConnectionFactory(
            sprintf(
                'mysql://%1$s:%2$s@%3$s/%4$s',
                DB_USER,
                DB_PASSWORD,
                DB_HOST,
                DB_NAME
            )
        );

        $context = $factory->createContext();
        $context->createDataBaseTable();
        return $context;
    }

您还需要一些东西来处理每条消息,并且您需要将消息和使用者传递给它:

    function handleMessage($message, $consumer)
    {
        // Business logic here
        if($business_logic_failed) {
            $context = createContext();
            $failed_queue = $context->createQueue('FAILED_QUEUE_HERE');
            $context->createProducer()->send($failed_queue, $message);
        } else {
            $consumer->acknowledge($message);
        }
        
    }

然后使用它:

$context = createContext();
$queue = $context->createQueue('QUEUE_NAME_HERE');
$consumer = $context->createConsumer($queue);

// This can be an infinite loop, or a loop for 10 messages and exit, whatever your logic
while(true) {
    // This command will block unless you pass a timeout, so no sleep is needed
    $message = $consumer->receive(/* optional timeout here */);
    handleMessage($message, $consumer);

    // Do whatever you want with message
}

也可以围绕它进行大量尝试/捕捉,并确保无论您以某种方式承认或失败消息。


推荐阅读