首页 > 解决方案 > Azure 函数未接收 Eventhub 事件

问题描述

我开始使用 Azure Functions,并且遇到了我的 Function 没有被进入我的 eventthub 的事件触发的问题。

这是我的功能的代码:

主机.json:

  "version": "2.0",
  "logging": {
    "applicationInsights": {
      "samplingSettings": {
        "isEnabled": true,
        "excludedTypes": "Request"
      }
    }
  },
  "extensionBundle": {
    "id": "Microsoft.Azure.Functions.ExtensionBundle",
    "version": "[2.*, 3.0.0)"
  }
}

函数.json:

  "scriptFile": "__init__.py",
  "bindings": [
    {
      "type": "eventHubTrigger",
      "name": "events",
      "direction": "in",
      "eventHubName": "eventhub",
      "connection": "eventhub_connection",
      "cardinality": "many",
      "consumerGroup": "$Default",
      "dataType": "stream"
    }
  ]
}

初始化.py:

import logging

import azure.functions as func


def main(events: List[func.EventHubEvent]):
    for event in events:
        logging.info('Python EventHub trigger processed an event: %s',
                        event.get_body().decode('utf-8'))
        logging.info(f'Function triggered to process a message: {event.get_body().decode()}')
        logging.info(f'  EnqueuedTimeUtc = {event.enqueued_time}')
        logging.info(f'  SequenceNumber = {event.sequence_number}')
        logging.info(f'  Offset = {event.offset}')

# def main(event: func.EventHubEvent):
#     logging.info(f'Function triggered to process a message: {event.get_body().decode()}')
#     logging.info(f'  EnqueuedTimeUtc = {event.enqueued_time}')
#     logging.info(f'  SequenceNumber = {event.sequence_number}')
#     logging.info(f'  Offset = {event.offset}')

#     # Metadata
#     for key in event.metadata:
#         logging.info(f'Metadata: {key} = {event.metadata[key]}')
  "IsEncrypted": false,
  "Values": {
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=storageaccount;AccountKey=storageacciuntaccesskey=;EndpointSuffix=core.windows.net",
    "eventhub_connection": "Endpoint=sb://eventhub01.servicebus.windows.net/;SharedAccessKeyName=function;SharedAccessKey=0omitted;EntityPath=eventhub"
  }
}

我从 Azure Function Core 工具提供的基本 eventthub python 代码开始。并且一直在测试人们的博客和 Microsoft 文档的在线示例中的不同代码段。

切换到基数时:一 -> 我切换到当前注释掉的代码。我不知道这是否应该像那样进行,这对我来说是正确的。

在任何情况下,无论基数设置如何,或者在二进制、流或字符串之间更改数据类型。我的功能根本不会触发。

我可以查询我的 eventthub 并查看/阅读事件。所以我知道我的政策,以及共享密钥等,工作正常。我也只使用 $Default 消费者组。

我还尝试设置一个 HTTP 触发函数,该函数是从 Azure Monitor 触发的。我可以在日志中看到每个进入函数的请求。

我在 eventthub 函数的代码中做错了吗?我是否缺少其他一些配置设置?我已经检查了该功能的访问规则,但这真的没关系吗?该函数正在从 eventthub 中提取事件。它不是由发起者发送数据。

编辑:添加了 local.settings.json 文件配置并更新了 function.json 编辑 2:我的具体问题的解决方案在答案的评论中。

标签: python-3.xazure-functionsazure-eventhub

解决方案


更新:

__init__.py 功能:

from typing import List
import logging

import azure.functions as func


def main(events: List[func.EventHubEvent]):
    for event in events:
        logging.info('Python EventHub trigger processed an event: %s',
                        event.get_body().decode('utf-8'))

向事件中心发送消息:

import asyncio
from azure.eventhub.aio import EventHubProducerClient
from azure.eventhub import EventData

async def run():
    # Create a producer client to send messages to the event hub.
    # Specify a connection string to your event hubs namespace and
    # the event hub name.
    producer = EventHubProducerClient.from_connection_string(conn_str="Endpoint=sb://testbowman.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxx;EntityPath=test", eventhub_name="test")
    async with producer:
        # Create a batch.
        event_data_batch = await producer.create_batch()

        # Add events to the batch.
        event_data_batch.add(EventData('First event '))
        event_data_batch.add(EventData('Second event'))
        event_data_batch.add(EventData('Third event'))

        # Send the batch of events to the event hub.
        await producer.send_batch(event_data_batch)

loop = asyncio.get_event_loop()
loop.run_until_complete(run())

请确保您提供正确的事件中心名称:

在此处输入图像描述


看来你function.json有问题,连接字符串不应该直接放在绑定项中。

它应该如下所示:

function.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "type": "eventHubTrigger",
      "name": "events",
      "direction": "in",
      "eventHubName": "test",
      "connection": "testbowman_RootManageSharedAccessKey_EVENTHUB",
      "cardinality": "many",
      "consumerGroup": "$Default",
      "dataType": "binary"
    }
  ]
}

local.settings.json

{
  "IsEncrypted": false,
  "Values": {
    "AzureWebJobsStorage": "DefaultEndpointsProtocol=https;AccountName=0730bowmanwindow;AccountKey=xxxxxx;EndpointSuffix=core.windows.net",
    "FUNCTIONS_WORKER_RUNTIME": "python",
    "testbowman_RootManageSharedAccessKey_EVENTHUB": "Endpoint=sb://testbowman.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=xxxxxx;EntityPath=test"
  }
}

推荐阅读