首页 > 解决方案 > 将消息移动到失败的 HTTP 请求的订阅死信

问题描述

我一直在寻找资源,但似乎找不到我需要的东西。我有一个带有服务总线触发器的 Azure 函数。由此,我使用服务总线消息中的值之一进行 HTTP 调用。

如果 HTTP 调用失败,我的另一个要求是对消息进行死信。但据我了解,该消息不再存在于订阅中,因为它已被正确接收。有没有办法让我将消息保留在订阅中,然后在成功后将其处置(如果没有则转移到DLQ?)

我找到了这段代码,但我不确定它是如何发送到 DLQ 的? https://github.com/Azure/azure-sdk-for-python/blob/azure-servicebus_7.3.0/sdk/servicebus/azure-servicebus/samples/sync_samples/receive_deadlettered_messages.py

"""
Example to show receiving dead-lettered messages from a Service Bus Queue.
"""

# pylint: disable=C0111

import os
from azure.servicebus import ServiceBusClient, ServiceBusMessage, ServiceBusSubQueue

CONNECTION_STR = os.environ['SERVICE_BUS_CONNECTION_STR']
QUEUE_NAME = os.environ["SERVICE_BUS_QUEUE_NAME"]

servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR)

with servicebus_client:
    sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
    messages = [ServiceBusMessage("Message to be deadlettered") for _ in range(10)]
    with sender:
        sender.send_messages(messages)

    print('dead lettering messages')
    receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME)
    with receiver:
        received_msgs = receiver.receive_messages(max_message_count=10, max_wait_time=5)
        for msg in received_msgs:
            print(str(msg))
            receiver.dead_letter_message(msg)

    print('receiving deadlettered messages')
    dlq_receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, sub_queue=ServiceBusSubQueue.DEAD_LETTER)
    with dlq_receiver:
        received_msgs = dlq_receiver.receive_messages(max_message_count=10, max_wait_time=5)
        for msg in received_msgs:
            print(str(msg))
            dlq_receiver.complete_message(msg)

print("Receive is done.")

这是我的一个代码片段:

async def main(msg: func.ServiceBusMessage):
    try:
        logging.info('Python ServiceBus queue trigger processed message: %s',
        msg.get_body().decode('utf-8'))
        await asyncio.gather(wait(), wait())

        result = json.dumps({
            'message_id': msg.message_id,
            'metadata' : msg.metadata
        })

        msgobj = json.loads(result)

        val = msgobj['metadata']['value']
        run_pipeline(val, msg)
    except Exception as e:
        logging.error(f"trigger failed: {e}")

TLDR;如何将消息保留在订阅中并处理它们(如果成功)或者如果没有将它们发送到 DLQ?

标签: pythonazureazure-functionsazureservicebusazure-servicebus-subscriptions

解决方案


您粘贴的代码是从死信队列中接收死信消息。

我在文档中找到了一些代码。您可以使用他们示例中的此代码段

from azure.servicebus import ServiceBusClient

import os
connstr = os.environ['SERVICE_BUS_CONNECTION_STR']
queue_name = os.environ['SERVICE_BUS_QUEUE_NAME']

with ServiceBusClient.from_connection_string(connstr) as client:
    with client.get_queue_receiver(queue_name) as receiver:
        for msg in receiver:
            print(str(msg))
            receiver.dead_letter_message(msg)

您可以查看在异常处理程序中使用上面的代码


推荐阅读