首页 > 解决方案 > 时间触发器上的 Python 中的 Azure 函数从 url 获取 zip 文件,解压缩,然后将文件输出到 Azure 存储中的 blob 容器

问题描述

这是我在 function.json 中的绑定

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "name": "mytimer",
      "type": "timerTrigger",
      "direction": "in",
      "schedule": "0 30 7 * * *"
    },
   
    {
      "type": "blob",
      "direction": "out",
      "name": "outputBlob",
      "path": "jarvisetl/nppesraw",
      "connection": "AzureWebJobsStorage"
    }
  ]
}

这是我的 _init.py

import datetime
import logging
import azure.functions as func
import urllib.request
from io import BytesIO
from urllib.request import urlopen
from zipfile import ZipFile

def main(mytimer: func.TimerRequest) -> str:
    utc_timestamp = datetime.datetime.utcnow().replace(
        tzinfo=datetime.timezone.utc).isoformat()

    if mytimer.past_due:
        logging.info('The timer is past due!')

    logging.info('Python timer trigger function ran at %s', utc_timestamp)

    zipurl = 'https://download.file.com'
    with urlopen(zipurl) as zipresp:
        with ZipFile(BytesIO(zipresp.read())) as zfile:
            zfile.extractall(outputblob)

我最大的问题是我收到一个错误,即在 function.json 中声明了“outputblob”,但在 python 脚本中并不明显。我不确定“outputblob”在 Python 脚本中的真正位置。

标签: pythonazure-functions

解决方案


更新:

import datetime
import logging

import azure.functions as func


def main(mytimer: func.TimerRequest,outputblob: func.Out[func.InputStream]) -> None:
    utc_timestamp = datetime.datetime.utcnow().replace(
        tzinfo=datetime.timezone.utc).isoformat()
    testdata = 'this is test.'
    outputblob.set(testdata)
    if mytimer.past_due:
        logging.info('The timer is past due!')

    logging.info('Python timer trigger function ran at %s', utc_timestamp)

原答案:

下面是一个简单的例子:

函数.json

{
  "scriptFile": "__init__.py",
  "bindings": [
    {
      "authLevel": "anonymous",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "route": "{test}",
      "methods": [
        "get",
        "post"
      ]
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    },
    {
      "name": "outputblob",
      "type": "blob",
      "path": "test1/{test}.txt",
      "connection": "str",
      "direction": "out"
    }
  ]
}

init.py

import logging

import azure.functions as func

def main(req: func.HttpRequest,outputblob: func.Out[func.InputStream],) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')
    testdata = 'this is test.'
    outputblob.set(testdata)
    name = req.params.get('name')
    return func.HttpResponse(f"This is output binding test, "+name)

推荐阅读