首页 > 解决方案 > 从 onEvent 内部停止 Azure EventBus 接收器

问题描述

我正在使用 Python Azure Event Hub 包,并具有以下代码:

    def on_event(partition_context, event):
        print("Received event from partition: {}.".format(partition_context.partition_id))
        event_json = json.loads(str(event))
        if event_json["command_status"]=="successull":
           # STOP RECEIVER HERE!


    def on_partition_initialize(partition_context):
        print("Partition: {} has been initialized.".format(partition_context.partition_id))

    def on_partition_close(partition_context, reason):
        print("Partition: {} has been closed, reason for closing: {}.".format(
            partition_context.partition_id,
            reason
        ))

    def on_error(partition_context, error):
        if partition_context:
            print("An exception: {} occurred during receiving from Partition: {}.".format(
                partition_context.partition_id,
                error
            ))
        else:
            print("An exception: {} occurred during the load balance process.".format(error))

    consumer_client = EventHubConsumerClient.from_connection_string(
        conn_str=self.conn_string_event_hub,
        consumer_group=self.consumer_group,
        eventhub_name=self.event_hub_name,
    )
    with consumer_client:
        consumer_client.receive(
            on_event=on_event,
            on_partition_initialize=on_partition_initialize,
            on_partition_close=on_partition_close,
            on_error=on_error,
        )

我知道您可以使用 asynco 通过超时停止接收器。

但是,我想做的是,如果我收到具有特定特征的事件,我想停止接收者。

要么,要么在第一个事件到达后停止接收器。

这可能吗?

谢谢!

标签: pythonazureazure-iot-hubazure-eventhub

解决方案


您可以通过在回调中关闭消费者客户端来停止接收on_event,如下所示。

    def on_event(partition_context, event):
        print("Received event from partition: {}.".format(partition_context.partition_id))
        event_json = json.loads(str(event))
        if event_json["command_status"]=="successull":
           consumer_client.close()


推荐阅读