首页 > 解决方案 > Celery + Azure 服务总线(代理)= 声明为空或令牌无效

问题描述

我正在尝试使用 Azure 服务总线作为我的 celery 应用程序的代理。

我已经通过参考各种来源修补了解决方案。目标是使用 Azure 服务总线作为代理,使用 PostgresSQL 作为后端。

我创建了一个 Azure 服务总线并将凭据复制RootManageSharedAccessKey到 celery 应用程序。

在此处输入图像描述

以下是task.py

from time import sleep
from celery import Celery
from kombu.utils.url import safequote

SAS_policy = safequote("RootManageSharedAccessKey") #SAS Policy
SAS_key = safequote("1234222zUY28tRUtp+A2YoHmDYcABCD") #Primary key from the previous SS
namespace = safequote("bluenode-dev")

app = Celery('tasks', backend='db+postgresql://afsan.gujarati:admin@localhost/local_dev', 
            broker=f'azureservicebus://{SAS_policy}:{SAS_key}=@{namespace}')

@app.task
def divide(x, y):
    sleep(30)
    return x/y

当我尝试使用以下命令运行 C​​elery 应用程序时:

celery -A tasks worker --loglevel=INFO

我收到以下错误

[2020-10-09 14:00:32,035: CRITICAL/MainProcess] Unrecoverable error: AzureHttpError('Unauthorized\n<Error><Code>401</Code><Detail>claim is empty or token is invalid. TrackingId:295f7c76-770e-40cc-8489-e0eb56248b09_G5S1, SystemTracker:bluenode-dev.servicebus.windows.net:$Resources/Queues, Timestamp:2020-10-09T20:00:31</Detail></Error>')
Traceback (most recent call last):
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 918, in create_channel
    return self._avail_channels.pop()
IndexError: pop from empty list

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/servicebusservice.py", line 1225, in _perform_request
    resp = self._filter(request)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/_http/httpclient.py", line 211, in perform_request
    raise HTTPError(status, message, respheaders, respbody)
azure.servicebus.control_client._http.HTTPError: Unauthorized

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/worker.py", line 203, in start
    self.blueprint.start(self)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/bootsteps.py", line 365, in start
    return self.obj.start()
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 311, in start
    blueprint.start(self)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
    step.start(parent)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/connection.py", line 21, in start
    c.connection = c.connect()
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 398, in connect
    conn = self.connection_for_read(heartbeat=self.amqheartbeat)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 404, in connection_for_read
    return self.ensure_connected(
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 430, in ensure_connected
    conn = conn.ensure_connection(
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/connection.py", line 383, in ensure_connection
    self._ensure_connection(*args, **kwargs)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/connection.py", line 435, in _ensure_connection
    return retry_over_time(
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/utils/functional.py", line 325, in retry_over_time
    return fun(*args, **kwargs)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/connection.py", line 866, in _connection_factory
    self._connection = self._establish_connection()
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/connection.py", line 801, in _establish_connection
    conn = self.transport.establish_connection()
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 938, in establish_connection
    self._avail_channels.append(self.create_channel(self))
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 920, in create_channel
    channel = self.Channel(connection)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/transport/azureservicebus.py", line 64, in __init__
    for queue in self.queue_service.list_queues():
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/servicebusservice.py", line 313, in list_queues
    response = self._perform_request(request)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/servicebusservice.py", line 1227, in _perform_request
    return _service_bus_error_handler(ex)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/_serialization.py", line 569, in _service_bus_error_handler
    return _general_error_handler(http_error)
  File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/_common_error.py", line 41, in _general_error_handler
    raise AzureHttpError(message, http_error.status)
azure.common.AzureHttpError: Unauthorized
<Error><Code>401</Code><Detail>claim is empty or token is invalid. TrackingId:295f7c76-770e-40cc-8489-e0eb56248b09_G5S1, SystemTracker:bluenode-dev.servicebus.windows.net:$Resources/Queues, Timestamp:2020-10-09T20:00:31</Detail></Error>

我在任何地方都没有看到直接的解决方案。我错过了什么?

PS 我没有在 Azure 服务总线中创建队列。我假设 celery 会在执行 celery 应用程序时自行创建队列。

PSS 我还尝试在 Python 的服务总线客户端中使用完全相同的凭据,它似乎可以工作。感觉就像芹菜的问题,但我无法弄清楚到底是什么。

标签: pythonazurequeueceleryazureservicebus

解决方案


如果要使用 Azure 服务总线传输连接 Azure 服务总线,则 URL 应为azureservicebus://{SAS policy name}:{SAS key}@{Service Bus Namespace}.

例如

  1. 获取共享访问策略RootManageSharedAccessKey

在此处输入图像描述

  1. 代码
from celery import Celery
from kombu.utils.url import safequote


SAS_policy = "RootManageSharedAccessKey"  # SAS Policy
# Primary key from the previous SS
SAS_key = safequote("X/*****qyY=")
namespace = "bowman1012"
app = Celery('tasks', backend='db+postgresql://<>@localhost/<>',
             broker=f'azureservicebus://{SAS_policy}:{SAS_key}@{namespace}')


@app.task
def add(x, y):
    return x + y

在此处输入图像描述


推荐阅读