python - 从 onEvent 内部停止 Azure EventBus 接收器
问题描述
我正在使用 Python Azure Event Hub 包,并具有以下代码:
def on_event(partition_context, event):
print("Received event from partition: {}.".format(partition_context.partition_id))
event_json = json.loads(str(event))
if event_json["command_status"]=="successull":
# STOP RECEIVER HERE!
def on_partition_initialize(partition_context):
print("Partition: {} has been initialized.".format(partition_context.partition_id))
def on_partition_close(partition_context, reason):
print("Partition: {} has been closed, reason for closing: {}.".format(
partition_context.partition_id,
reason
))
def on_error(partition_context, error):
if partition_context:
print("An exception: {} occurred during receiving from Partition: {}.".format(
partition_context.partition_id,
error
))
else:
print("An exception: {} occurred during the load balance process.".format(error))
consumer_client = EventHubConsumerClient.from_connection_string(
conn_str=self.conn_string_event_hub,
consumer_group=self.consumer_group,
eventhub_name=self.event_hub_name,
)
with consumer_client:
consumer_client.receive(
on_event=on_event,
on_partition_initialize=on_partition_initialize,
on_partition_close=on_partition_close,
on_error=on_error,
)
我知道您可以使用 asynco 通过超时停止接收器。
但是,我想做的是,如果我收到具有特定特征的事件,我想停止接收者。
要么,要么在第一个事件到达后停止接收器。
这可能吗?
谢谢!
解决方案
您可以通过在回调中关闭消费者客户端来停止接收on_event
,如下所示。
def on_event(partition_context, event):
print("Received event from partition: {}.".format(partition_context.partition_id))
event_json = json.loads(str(event))
if event_json["command_status"]=="successull":
consumer_client.close()
推荐阅读
- python - logging.handlers.TimedRotatingFileHandler 的错误?
- python - 从PDF中提取带有坐标的表格
- angular - 无法列出惰性路由:Angular 更新后的未知模块 AppModule
- java - 如何使用批处理文件调用关闭处理程序?
- c# - 如何按行对二维数组进行排序?
- sql-server - Azure Windows 虚拟机上的 SQL 服务器错误 400 MismatchSqlVmAndVmName
- php - file_put_contents, file_append 如何写入内容 DESC
- c++ - 如何找到第二小的向量?
- c# - Xaml UI Designer 对于 .net core 3.0 中的 wpf 项目不活动
- javascript - Cookie 在同意之前设置