首页 > 解决方案 > 将事件中心消息发送到 Blob 存储的 Azure 事件中心捕获的替代方案?

问题描述

有什么方法可以将我的事件中心数据发送到 Azure 中的 blob 存储,这些数据通过 Postman 通过 HTTP 以 JSON 格式发送?我尝试使用 EventHub 的 Capture 功能,但不幸的是,数据以 Avro 格式保存,我真的很难再次将其转换回其原始 JSON 格式。

因此,我想将我的 EventHub 数据直接发送到某种 blob 存储,这将使我的事件中心消息保持其原始 JSON 格式,然后我可以使用 Azure 函数(获取 Http 触发器)从我的 SPA 通过前端通信。

另外,我是否必须为容器中的每条消息创建一个新的 blob?因为我认为我不能将它们全部写在一个 blob 中,因为当我同时触发我的 get HTTP 函数时,我将无法通过前端检索我的数据。

是否有事件中心捕获的替代品?使用普通 blob 存储是最佳解决方案吗?我已经阅读了一些关于 Azure Timeseries Insights 和 CosmosDB 的文章,但我不确定这些是否是解决我的问题的最佳方法。

标签: azureazure-functionsazure-eventhubazure-eventhub-capture

解决方案


所以问题是我最初通过 Postman 将其作为原始数据发送:

JSON 格式的原始数据通过 Postman 发送:

{
   "id":1,
   "receiver":"2222222222222",
   "message":{
      "Name":"testing",
      "PersonId":2,
      "CarId":2,
      "GUID":"1s3q1d-s546dq1-8e22e",
      "LineId":2,
      "SvcId":2,
      "Lat":-64.546547,
      "Lon":-64.546547,
      "TimeStamp":"2021-03-18T08:29:36.758Z",
      "Recorder":"dq65ds4qdezzer",
      "Env":"DEV"
   },
   "operator":20404,
   "sender":"MSISDN",
   "binary":1,
   "sent":"2021-03-18T08:29:36.758Z"
}

一旦这被事件中心捕获捕获,它就会转换为 Avro 文件。我正在尝试使用 fastavro 检索数据并将其转换为 JSON 格式。问题是我没有取回 Postman 最初发送的原始数据。我找不到将其转换回原始状态的方法,为什么 Avro 还会从 Postman 向我发送其他信息?我可能需要找到一种方法将“正文”设置为仅转换。但是由于某种原因,它还在正文中添加了“字节”,我只是想取回通过 Postman 发送的原始原始数据。

初始化.py(Azure 函数)

    import logging
    import os
    import string
    import json
    import uuid
    import avro.schema
    import tempfile
    import azure.functions as func
    from azure.storage.blob import BlobServiceClient, BlobClient, ContainerClient, __version__
    from avro.datafile import DataFileReader, DataFileWriter
    from avro.io import DatumReader, DatumWriter
    from fastavro import reader, json_writer
    
    
    #Because the Apache Python avro package is written in pure Python, it is relatively slow, therefoer I make use of fastavro
    def avroToJson(avroFile):
        with open("json_file.json", "w") as json_file:
            with open(avroFile, "rb") as avro_file:
                avro_reader = reader(avro_file)
                json_writer(json_file, avro_reader.writer_schema, avro_reader)
    
    
    def main(req: func.HttpRequest) -> func.HttpResponse:
      logging.info('Python HTTP trigger function processed a request.')
      print('Processor started using path ' + os.getcwd())
      connect_str = "###########"
      container = ContainerClient.from_connection_string(connect_str, container_name="####")
      blob_list = container.list_blobs() # List the blobs in the container.
      for blob in blob_list:
          # Content_length == 508 is an empty file, so process only content_length > 508 (skip empty files).
          if blob.size > 508:
              print('Downloaded a non empty blob: ' + blob.name)
              # Create a blob client for the blob.
              blob_client = ContainerClient.get_blob_client(container, blob=blob.name)
              # Construct a file name based on the blob name.
              cleanName = str.replace(blob.name, '/', '_')
              cleanName = os.getcwd() + '\\' + cleanName
              # Download file
              with open(cleanName, "wb+") as my_file: # Open the file to write. Create it if it doesn't exist. 
                  my_file.write(blob_client.download_blob().readall())# Write blob contents into the file.
                  
              avroToJson(cleanName)
              with open('json_file.json','r') as file:
                   jsonStr = file.read()
            
      return func.HttpResponse(jsonStr, status_code=200)

预期结果:

{
   "id":1,
   "receiver":"2222222222222",
   "message":{
      "Name":"testing",
      "PersonId":2,
      "CarId":2,
      "GUID":"1s3q1d-s546dq1-8e22e",
      "LineId":2,
      "SvcId":2,
      "Lat":-64.546547,
      "Lon":-64.546547,
      "TimeStamp":"2021-03-18T08:29:36.758Z",
      "Recorder":"dq65ds4qdezzer",
      "Env":"DEV"
   },
   "operator":20404,
   "sender":"MSISDN",
   "binary":1,
   "sent":"2021-03-18T08:29:36.758Z"
}

实际结果:

{
   "SequenceNumber":19,
   "Offset":"10928",
   "EnqueuedTimeUtc":"4/1/2021 8:43:19 AM",
   "SystemProperties":{
      "x-opt-enqueued-time":{
         "long":1617266599145
      }
   },
   "Properties":{
      "Postman-Token":{
         "string":"37ff4cc6-9124-45e5-ba9d-######e"
      }
   },
   "Body":{
      "bytes":"{\r\n  \"id\": 1,\r\n  \"receiver\": \"2222222222222\",\r\n  \"message\": {\r\n    \"Name\": \"testing\",\r\n    \"PersonId\": 2,\r\n    \"CarId\": 2,\r\n    \"GUID\": \"1s3q1d-s546dq1-8e22e\",\r\n    \"LineId\": 2,\r\n    \"SvcId\": 2,\r\n    \"Lat\": -64.546547,\r\n    \"Lon\": -64.546547,\r\n    \"TimeStamp\": \"2021-03-18T08:29:36.758Z\",\r\n    \"Recorder\": \"dq65ds4qdezzer\",\r\n    \"Env\": \"DEV\"\r\n  },\r\n  \"operator\": 20404,\r\n  \"sender\": \"MSISDN\",\r\n  \"binary\": 1,\r\n  \"sent\": \"2021-03-29T08:29:36.758Z\"\r\n}"
   }
}

推荐阅读