首页 > 解决方案 > 如何在 Python 中停止 Azure 事件中心使用者客户端

问题描述

我在使用 Python 的 Azure Event Bub 时遇到了一些麻烦。下面是我的连接代码(取自微软文档)

import asyncio
from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore


async def on_event(partition_context, event):
    # Print the event data.
    print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))

    # Update the checkpoint so that the program doesn't read the events
    # that it has already read when you run it next time.
    await partition_context.update_checkpoint(event)

async def main():
    # Create an Azure blob checkpoint store to store the checkpoints.
    checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")

    # Create a consumer client for the event hub.
    client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
    async with client:
        # Call the receive method. Read from the beginning of the partition (starting_position: "-1")
        await client.receive(on_event=on_event,  starting_position="-1")

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    # Run the main method.
    loop.run_until_complete(main()) 

在这里,接收者/消费者继续收听。如果我删除任何等待,消费者会抛出一个错误。有谁知道如何在运行一段时间后停止消费者,比如超时)。

标签: pythonasync-awaitazure-eventhub

解决方案


@Abhishek

这里有 2 个选项:

  1. 当一段时间内没有活动时,您可以停止收听。
  2. 您可以在固定时间后停止收听。

在以下步骤中进行了详细说明。

选项1

您可以使用 max_wait_time 参数来停止侦听,以防在特定时间内没有活动。

在此处输入图像描述

我确实启动了上述的一个简单用例。但是您可以进一步优化它。

import asyncio
from azure.eventhub.aio import EventHubConsumerClient

event_hub_connection_str = '<CON_STR>'
eventhub_name = '<EventHub_NAME>'


consumer = EventHubConsumerClient.from_connection_string(
     conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

#this event gets called when the message is received or Max_Wait_time is clocked
async def on_event(partition_context, event):
       print(event) #Optional - to see output
       #Checks whether there is any event returned None. None is returned when this event is called after the Max_Wait_time is crossed
       if(event !=None):
            print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
            #you can update other code like updating blob store
                
       else:
           print("Timeout is Hit")
           #updating the 
           global receive
           receive = False

  
async def close():
    print("Closing the client.")
    await consumer.close()
    print("Closed")
    
async def main():
    recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event,max_wait_time=15))
    while(True): # keep receiving for 3 seconds
        await asyncio.sleep(3) 
        if(receive != True):
            print("Cancelling the Task")
            recv_task.cancel()  # stop receiving by cancelling the task
            break;
  
   

receive = True
asyncio.run(main())
asyncio.run(close())#closing the Client

关于上面的代码。如果 15 秒内没有活动,则异步任务将被取消,消费者客户端将被关闭。程序最终优雅地退出。

选项 2

如果您正在寻找一个代码,您想让客户在固定时间(如 1 小时或其他时间)内收听。你可以参考下面的代码

参考代码

event_hub_connection_str = '<>'
eventhub_name = '<>'
import asyncio

from azure.eventhub.aio import EventHubConsumerClient

consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )
async def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, async will have better performance.
       print("Received event from partition: {}".format(partition_context.partition_id))

   # The receive method is a coroutine which will be blocking when awaited.
   # It can be executed in an async task for non-blocking behavior, and combined with the 'close' method.
   
async def main():
        recv_task = asyncio.ensure_future(consumer.receive(on_event=on_event))
        await asyncio.sleep(15)  # keep receiving for 3 seconds
        recv_task.cancel()  # stop receiving

async def close():
    print("Closing.....")
    await consumer.close()
    print("Closed") 

asyncio.run(main())
asyncio.run(close())#closing the Client

下面的代码负责让客户端监听一段时间:

recv_task =
asyncio.ensure_future(consumer.receive(on_event=on_event))    
await asyncio.sleep(3)  # keep receiving for 3 seconds    
recv_task.cancel()

您可以根据需要增加时间。


推荐阅读