python - Celery + Azure 服务总线(代理)= 声明为空或令牌无效
问题描述
我正在尝试使用 Azure 服务总线作为我的 celery 应用程序的代理。
我已经通过参考各种来源修补了解决方案。目标是使用 Azure 服务总线作为代理,使用 PostgresSQL 作为后端。
我创建了一个 Azure 服务总线并将凭据复制RootManageSharedAccessKey
到 celery 应用程序。
以下是task.py
from time import sleep
from celery import Celery
from kombu.utils.url import safequote
SAS_policy = safequote("RootManageSharedAccessKey") #SAS Policy
SAS_key = safequote("1234222zUY28tRUtp+A2YoHmDYcABCD") #Primary key from the previous SS
namespace = safequote("bluenode-dev")
app = Celery('tasks', backend='db+postgresql://afsan.gujarati:admin@localhost/local_dev',
broker=f'azureservicebus://{SAS_policy}:{SAS_key}=@{namespace}')
@app.task
def divide(x, y):
sleep(30)
return x/y
当我尝试使用以下命令运行 Celery 应用程序时:
celery -A tasks worker --loglevel=INFO
我收到以下错误
[2020-10-09 14:00:32,035: CRITICAL/MainProcess] Unrecoverable error: AzureHttpError('Unauthorized\n<Error><Code>401</Code><Detail>claim is empty or token is invalid. TrackingId:295f7c76-770e-40cc-8489-e0eb56248b09_G5S1, SystemTracker:bluenode-dev.servicebus.windows.net:$Resources/Queues, Timestamp:2020-10-09T20:00:31</Detail></Error>')
Traceback (most recent call last):
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 918, in create_channel
return self._avail_channels.pop()
IndexError: pop from empty list
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/servicebusservice.py", line 1225, in _perform_request
resp = self._filter(request)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/_http/httpclient.py", line 211, in perform_request
raise HTTPError(status, message, respheaders, respbody)
azure.servicebus.control_client._http.HTTPError: Unauthorized
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/worker.py", line 203, in start
self.blueprint.start(self)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/bootsteps.py", line 365, in start
return self.obj.start()
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 311, in start
blueprint.start(self)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/connection.py", line 21, in start
c.connection = c.connect()
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 398, in connect
conn = self.connection_for_read(heartbeat=self.amqheartbeat)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 404, in connection_for_read
return self.ensure_connected(
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/celery/worker/consumer/consumer.py", line 430, in ensure_connected
conn = conn.ensure_connection(
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/connection.py", line 383, in ensure_connection
self._ensure_connection(*args, **kwargs)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/connection.py", line 435, in _ensure_connection
return retry_over_time(
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/utils/functional.py", line 325, in retry_over_time
return fun(*args, **kwargs)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/connection.py", line 866, in _connection_factory
self._connection = self._establish_connection()
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/connection.py", line 801, in _establish_connection
conn = self.transport.establish_connection()
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 938, in establish_connection
self._avail_channels.append(self.create_channel(self))
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/transport/virtual/base.py", line 920, in create_channel
channel = self.Channel(connection)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/kombu/transport/azureservicebus.py", line 64, in __init__
for queue in self.queue_service.list_queues():
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/servicebusservice.py", line 313, in list_queues
response = self._perform_request(request)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/servicebusservice.py", line 1227, in _perform_request
return _service_bus_error_handler(ex)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/_serialization.py", line 569, in _service_bus_error_handler
return _general_error_handler(http_error)
File "/Users/afsan.gujarati/.pyenv/versions/3.8.1/envs/celery-servicebus/lib/python3.8/site-packages/azure/servicebus/control_client/_common_error.py", line 41, in _general_error_handler
raise AzureHttpError(message, http_error.status)
azure.common.AzureHttpError: Unauthorized
<Error><Code>401</Code><Detail>claim is empty or token is invalid. TrackingId:295f7c76-770e-40cc-8489-e0eb56248b09_G5S1, SystemTracker:bluenode-dev.servicebus.windows.net:$Resources/Queues, Timestamp:2020-10-09T20:00:31</Detail></Error>
我在任何地方都没有看到直接的解决方案。我错过了什么?
PS 我没有在 Azure 服务总线中创建队列。我假设 celery 会在执行 celery 应用程序时自行创建队列。
PSS 我还尝试在 Python 的服务总线客户端中使用完全相同的凭据,它似乎可以工作。感觉就像芹菜的问题,但我无法弄清楚到底是什么。
解决方案
如果要使用 Azure 服务总线传输连接 Azure 服务总线,则 URL 应为azureservicebus://{SAS policy name}:{SAS key}@{Service Bus Namespace}
.
例如
- 获取共享访问策略
RootManageSharedAccessKey
- 代码
from celery import Celery
from kombu.utils.url import safequote
SAS_policy = "RootManageSharedAccessKey" # SAS Policy
# Primary key from the previous SS
SAS_key = safequote("X/*****qyY=")
namespace = "bowman1012"
app = Celery('tasks', backend='db+postgresql://<>@localhost/<>',
broker=f'azureservicebus://{SAS_policy}:{SAS_key}@{namespace}')
@app.task
def add(x, y):
return x + y
推荐阅读
- flutter - Flutter - 无法从 Cloud Firestore 中的数据生成变量 - 未定义名称错误
- java - Spring Boot 应用程序部署问题
- apache-spark - scala数据框中的rlike给出了错误
- python - python中DataType和Class的区别
- javascript - scrollIntoView 导致整个页面滚动
- node.js - 如何在 mongodb 中进行类似“不包含其他值的数组”的查询?
- node.js - 更新 JSON 问题,如何正确更新?
- swift - CloudKit 权利缺失
- graphql - 公共 GraphQL 指令
- c# - 将 [] 字符串转换为 int?