python-3.x - Azure 服务总线将 csv 行发送到队列
问题描述
我正在测试用于 Azure 服务总线的 python SDK 以将消息发送到队列。基本测试工作得很好,因为我正在向队列发送一个字符串,如下所示:
import csv
from azure.servicebus import ServiceBusClient, ServiceBusMessage
CONNECTION_STR = "CONN_STR"
QUEUE_NAME= "queue name"
def send_single_message(sender):
message = ServiceBusMessage("Single Message")
sender.send_messages(message)
print("Sent a single message")
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)
with servicebus_client:
sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
with sender:
send_single_message(sender)
print("Done sending messages")
print("-----------------------")
with servicebus_client:
receiver = servicebus_client.get_queue_receiver(queue_name=QUEUE_NAME, max_wait_time=5)
with receiver:
for msg in receiver:
print("Received: " + str(msg))
receiver.complete_message(msg)
现在,我想要实现的下一步是遍历一个 csv 文件,并为每一行将其发送到队列。
所以我尝试遍历这个 csv 文件,并将这些行发送到队列。如下:
def send_a_list_of_messages(sender):
to_queue = []
with open('final_result.csv') as f:
reader = csv.reader(f)
for row in reader:
to_queue.append(row)
print(to_queue)
message = [ServiceBusMessage(to_queue) for _ in range(len(to_queue))]
sender.send_messages(message)
print("Sent a single message")
servicebus_client = ServiceBusClient.from_connection_string(conn_str=CONNECTION_STR, logging_enable=True)
with servicebus_client:
sender = servicebus_client.get_queue_sender(queue_name=QUEUE_NAME)
with sender:
send_a_list_of_messages(sender)
print("Done sending messages")
print("-----------------------")
但是当我运行我的代码时,我得到了这个错误:
Traceback (most recent call last):
File "servicebus.py", line 23, in <module>
send_a_list_of_messages(sender)
File "servicebus.py", line 13, in send_a_list_of_messages
message = [ServiceBusMessage(to_queue) for _ in range(len(to_queue))]
File "servicebus.py", line 13, in <listcomp>
message = [ServiceBusMessage(to_queue) for _ in range(len(to_queue))]
File "/Users/usr/opt/anaconda3/lib/python3.8/site-packages/azure/servicebus/_common/message.py", line 110, in __init__
self._build_message(body)
File "/Users/usr/opt/anaconda3/lib/python3.8/site-packages/azure/servicebus/_common/message.py", line 190, in _build_message
raise TypeError(
TypeError: ServiceBusMessage body must be a string, bytes, or None. Got instead: <class 'list'>
我确实知道这些服务需要特定的正文类型,但我很确定这to_queue[]
将是一个字符串类型。
请,如果有人可以帮助我解决这个问题,我将不胜感激。如果您有任何其他问题,请告诉我。
太感谢了
编辑:
我通过将 ServiceBusMessage 转换为字符串来解决正文类型问题,如下所示:
message = [ServiceBusMessage(str(to_queue)) for _ in range(len(to_queue))]
但现在我收到关于大小限制的错误:
azure.servicebus.exceptions.MessageSizeExceededError: ServiceBusMessageBatch has reached its size limit: 262144
谁能帮我这个。
我尝试以不同的方式处理工作流程,使函数应该为每一行触发多次。如下:
for msg in to_queue:
print(msg)
def send_a_list_of_messages(sender):
print(to_queue)
message = [ServiceBusMessage(str(to_queue)) for _ in range(len(to_queue))]
sender.send_messages(message)
print("Sent a single message")
但我得到同样的错误。
我通过决定实现一个 for 循环来触发 csv 文件中每一行的函数,暂时克服了大小问题。
for msg in to_queue:
# print(msg)
def send_a_list_of_messages(sender):
# print(to_queue)
message = [ServiceBusMessage(str(msg))]
sender.send_messages(message)
print("msg sent: " + str(msg))
这有效但部分有效,因为该函数仅发送 csv 文件中的最后一行。
解决方案
ServiceBusMessage 仅接受 str/bytes,但最初您以列表形式发送。如果在您转换为 a 后它仍然失败
str
,则可能还有其他问题。你能提供一个你的csv文件样本吗?A
MessageSizeExceededError
当消息列表/批次的大小超过最大限制时发生。您可以在此处阅读有关限制和批次的更多信息:https ://docs.microsoft.com/en-us/python/api/azure-servicebus/azure.servicebus.servicebusmessagebatch?view=azure-python 。为确保您不超过限制并安全地发送您的一批消息,您应该改用此模式:https ://github.com/Azure/azure-sdk-for-python/blob/ac60fb93ea23f32e5ed5a83532c1ed6aa49921b5/sdk/servicebus /azure-servicebus/samples/sync_samples/send_queue.py#L32
请让我知道,如果你有任何问题!
推荐阅读
- c# - C#如何让不同速度的进程一起工作
- javascript - 回调函数有问题
- python - Python:无名关键字参数列表
- android - Android 单元测试改造失败
- java - 使用 mySQL 从 Spring Boot 开始时,Javers 给出重复的键名“jv_global_id_owner_id_fk_idx”
- kql - 如何在日期之间创建 kql 查询
- three.js - three.js 失真,glb (gltf) 模型上的颜色替换。蓝色变为绿色,黄色变为橙色
- powershell - 在 dc 调用命令 - 获取 gpinheritance - 不起作用
- regex - 如何从以下 reg 表达式响应中排除最后一个字符
- python - 找出列表中直到特定索引的唯一元素的程序