首页 > 解决方案 > Azure 服务总线将 csv 行发送到队列

问题描述

我正在测试用于 Azure 服务总线的 python SDK 以将消息发送到队列。基本测试工作得很好,因为我正在向队列发送一个字符串,如下所示:

import csv
from azure.servicebus import ServiceBusClient, ServiceBusMessage

CONNECTION_STR = "CONN_STR"
QUEUE_NAME= "queue name"
def send_single_message(sender):
    message = ServiceBusMessage("Single Message")
    sender.send_messages(message)
    print("Sent a single message")


servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)

with servicebus_client:
    sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
    with sender:
        send_single_message(sender)
print("Done sending messages")
print("-----------------------")

with servicebus_client:
    receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, max_wait_time=5)
    with receiver:
        for msg in receiver:
            print("Received: " + str(msg))
            receiver.complete_message(msg)

现在,我想要实现的下一步是遍历一个 csv 文件,并为每一行将其发送到队列。

所以我尝试遍历这个 csv 文件,并将这些行发送到队列。如下:

def send_a_list_of_messages(sender):
    to_queue = []
    with open('final_result.csv') as f:
        reader = csv.reader(f)
        for row in reader:
            to_queue.append(row)
    print(to_queue)
    message = [ServiceBusMessage(to_queue) for _ in range(len(to_queue))]
    sender.send_messages(message)
    print("Sent a single message")


servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)

with servicebus_client:
    sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
    with sender:
        send_a_list_of_messages(sender)

print("Done sending messages")
print("-----------------------")

但是当我运行我的代码时,我得到了这个错误:

Traceback (most recent call last):
  File "servicebus.py", line 23, in <module>
    send_a_list_of_messages(sender)
  File "servicebus.py", line 13, in send_a_list_of_messages
    message = [ServiceBusMessage(to_queue) for _ in range(len(to_queue))]
  File "servicebus.py", line 13, in <listcomp>
    message = [ServiceBusMessage(to_queue) for _ in range(len(to_queue))]
  File "/Users/usr/opt/anaconda3/lib/python3.8/site-packages/azure/servicebus/_common/message.py", line 110, in __init__
    self._build_message(body)
  File "/Users/usr/opt/anaconda3/lib/python3.8/site-packages/azure/servicebus/_common/message.py", line 190, in _build_message
    raise TypeError(
TypeError: ServiceBusMessage body must be a string, bytes, or None.  Got instead: <class 'list'>

我确实知道这些服务需要特定的正文类型,但我很确定这to_queue[]将是一个字符串类型。

请,如果有人可以帮助我解决这个问题,我将不胜感激。如果您有任何其他问题,请告诉我。

太感谢了

编辑:

我通过将 ServiceBusMessage 转换为字符串来解决正文类型问题,如下所示:

message = [ServiceBusMessage(str(to_queue)) for _ in range(len(to_queue))]

但现在我收到关于大小限制的错误:

azure.servicebus.exceptions.MessageSizeExceededError: ServiceBusMessageBatch has reached its size limit: 262144

谁能帮我这个。

我尝试以不同的方式处理工作流程,使函数应该为每一行触发多次。如下:

for msg in to_queue:
    print(msg)
    def send_a_list_of_messages(sender):
        print(to_queue)
        message = [ServiceBusMessage(str(to_queue)) for _ in range(len(to_queue))]
        sender.send_messages(message)
        print("Sent a single message")

但我得到同样的错误。

我通过决定实现一个 for 循环来触发 csv 文件中每一行的函数,暂时克服了大小问题。

for msg in to_queue:
    # print(msg)
    def send_a_list_of_messages(sender):
        # print(to_queue)
        message = [ServiceBusMessage(str(msg))]
        sender.send_messages(message)
        print("msg sent: " + str(msg))

这有效但部分有效,因为该函数仅发送 csv 文件中的最后一行。

标签: python-3.xazure-devopsazureservicebusazure-sdk-python

解决方案


  1. ServiceBusMessage 仅接受 str/bytes,但最初您以列表形式发送。如果在您转换为 a 后它仍然失败str,则可能还有其他问题。你能提供一个你的csv文件样本吗?

  2. AMessageSizeExceededError当消息列表/批次的大小超过最大限制时发生。您可以在此处阅读有关限制和批次的更多信息:https ://docs.microsoft.com/en-us/python/api/azure-servicebus/azure.servicebus.servicebusmessagebatch?view=azure-python 。为确保您不超过限制并安全地发送您的一批消息,您应该改用此模式:https ://github.com/Azure/azure-sdk-for-python/blob/ac60fb93ea23f32e5ed5a83532c1ed6aa49921b5/sdk/servicebus /azure-servicebus/samples/sync_samples/send_queue.py#L32

请让我知道,如果你有任何问题!


推荐阅读