python - 如何在 Python 中停止 Azure 事件中心使用者客户端
问题描述
我在使用 Python 的 Azure Event Bub 时遇到了一些麻烦。下面是我的连接代码(取自微软文档)
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
async def on_event(partition_context, event):
# Print the event data.
print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
# Update the checkpoint so that the program doesn't read the events
# that it has already read when you run it next time.
await partition_context.update_checkpoint(event)
async def main():
# Create an Azure blob checkpoint store to store the checkpoints.
checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")
# Create a consumer client for the event hub.
client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
async with client:
# Call the receive method. Read from the beginning of the partition (starting_position: "-1")
await client.receive(on_event=on_event, starting_position="-1")
if __name__ == '__main__':
loop = asyncio.get_event_loop()
# Run the main method.
loop.run_until_complete(main())
在这里,接收者/消费者继续收听。如果我删除任何等待,消费者会抛出一个错误。有谁知道如何在运行一段时间后停止消费者,比如超时)。
解决方案
@Abhishek
这里有 2 个选项:
- 当一段时间内没有活动时,您可以停止收听。
- 您可以在固定时间后停止收听。
在以下步骤中进行了详细说明。
选项1
您可以使用 max_wait_time 参数来停止侦听,以防在特定时间内没有活动。
我确实启动了上述的一个简单用例。但是您可以进一步优化它。
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
event_hub_connection_str = '<CON_STR>'
eventhub_name = '<EventHub_NAME>'
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
#this event gets called when the message is received or Max_Wait_time is clocked
async def on_event(partition_context, event):
print(event) #Optional - to see output
#Checks whether there is any event returned None. None is returned when this event is called after the Max_Wait_time is crossed
if(event !=None):
print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
#you can update other code like updating blob store
else:
print("Timeout is Hit")
#updating the
global receive
receive = False
async def close():
print("Closing the client.")
await consumer.close()
print("Closed")
async def main():
recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event,max_wait_time=15))
while(True): # keep receiving for 3 seconds
await asyncio.sleep(3)
if(receive != True):
print("Cancelling the Task")
recv_task.cancel() # stop receiving by cancelling the task
break;
receive = True
asyncio.run(main())
asyncio.run(close())#closing the Client
关于上面的代码。如果 15 秒内没有活动,则异步任务将被取消,消费者客户端将被关闭。程序最终优雅地退出。
选项 2
如果您正在寻找一个代码,您想让客户在固定时间(如 1 小时或其他时间)内收听。你可以参考下面的代码
event_hub_connection_str = '<>'
eventhub_name = '<>'
import asyncio
from azure.eventhub.aio import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
async def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, async will have better performance.
print("Received event from partition: {}".format(partition_context.partition_id))
# The receive method is a coroutine which will be blocking when awaited.
# It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.
async def main():
recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
await asyncio.sleep(15) # keep receiving for 3 seconds
recv_task.cancel() # stop receiving
async def close():
print("Closing.....")
await consumer.close()
print("Closed")
asyncio.run(main())
asyncio.run(close())#closing the Client
下面的代码负责让客户端监听一段时间:
recv_task =
asyncio.ensure_future(consumer.receive(on_event=on_event))
await asyncio.sleep(3) # keep receiving for 3 seconds
recv_task.cancel()
您可以根据需要增加时间。
推荐阅读
- javascript - 节点/javascript中的“位置”属性
- azure - Login-AzureRmAccount :-Credential 参数只能与组织 ID 凭据一起使用
- freemarker - 计算列表中的值
- wso2 - 导入 API 时出错,因为 API 反映在 api 发布者中,但在 APIMCLI 中引发错误
- python - 给定整数列表,删除列表的奇数位置(从位置 1(索引 0)开始,直到列表在最短时间包含单个元素
- java - 在内存中解压缩多部分格式文件
- react-native - 如何使用反应导航获得模态效果?
- laravel - Laravel 6.0 中的“调用未定义函数 str_slug()”
- arm - STM32 - 如何在 HAL 库中为外设 R/W 选择 DMA 或中断
- c - 编写一个函数来分割字符串