首页 > 解决方案 > 如何处理数据块中具有长路径的 blob 存储中的许多文件?

问题描述

我为 API 管理服务启用了日志记录,并且日志存储在存储帐户中。现在我正在尝试在 Azure Databricks 工作区中处理它们,但我正在努力访问这些文件。

问题似乎是自动生成的虚拟文件夹结构如下所示:

/insights-logs-gatewaylogs/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=*/m=*/d=*/h=*/m=00/PT1H.json

我已将insights-logs-gatewaylogs容器安装在下面/mnt/diags,并且dbutils.fs.ls('/mnt/diags')正确列出了该resourceId=文件夹,但未dbutils.fs.ls('/mnt/diags/resourceId=')找到声明文件

如果我沿虚拟文件夹结构创建空标记 blob,我可以列出每个后续级别,但该策略显然会失败,因为路径的最后一部分是按年/月/日/小时动态组织的。

例如一个

spark.read.format('json').load("dbfs:/mnt/diags/logs/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=*/m=*/d=*/h=*/m=00/PT1H.json")

产生此错误:

java.io.FileNotFoundException: File/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=2019 does not exist.

很明显,通配符已经找到了第一年的文件夹,但拒绝再往下走。

我在 Azure 数据工厂中设置了一个复制作业,它成功复制了同一个 blob 存储帐户中的所有 json blob 并删除了resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>前缀(因此根文件夹以年份组件开头)并且可以一直成功访问而无需创建空的标记 blob。

所以问题似乎与大部分为空的长虚拟文件夹结构有关。

还有另一种方法来处理数据块中的这些文件夹结构吗?

更新:我也尝试source在安装时提供路径,但这也无济于事

标签: azureazure-blob-storageazure-databricks

解决方案


我想我可能已经找到了这个问题的根本原因。应该早点尝试过,但我提供了现有 blob 的确切路径,如下所示:

spark.read.format('json').load("dbfs:/mnt/diags/logs/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=2019/m=08/d=20/h=06/m=00/PT1H.json")

我得到了一个更有意义的错误:

shaded.databricks.org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: Blob 类型不正确,请使用正确的 Blob 类型访问服务器上的 Blob。预期的 BLOCK_BLOB,实际的 APPEND_BLOB。

事实证明,开箱即用的日志记录会创建附加 blob(并且似乎没有办法改变这一点),并且从这张票的外观来看,对附加 blob 的支持仍然是 WIP:https://issues.apache .org/jira/browse/HADOOP-13475

FileNotFoundException可能是一个红鲱鱼,这可能是由于在尝试扩展通配符并找到不受支持的 blob 类型时吞下内部异常而导致的。

更新

终于找到了合理的解决办法。我在我的工作区中安装了azure-storagePython 包(如果你在家里使用 Scala,它已经安装了)并自己加载了 blob。下面的大多数代码都是为了添加通配符支持,如果您愿意只匹配路径前缀,则不需要它:

%python

import re
import json
from azure.storage.blob import AppendBlobService


abs = AppendBlobService(account_name='<account>', account_key="<access_key>")

base_path = 'resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>'
pattern = base_path + '/*/*/*/*/m=00/*.json'
filter = glob2re(pattern)

spark.sparkContext \
     .parallelize([blob.name for blob in abs.list_blobs('insights-logs-gatewaylogs', prefix=base_path) if re.match(filter, blob.name)]) \
     .map(lambda blob_name: abs.get_blob_to_bytes('insights-logs-gatewaylogs', blob_name).content.decode('utf-8').splitlines()) \
     .flatMap(lambda lines: [json.loads(l) for l in lines]) \
     .collect()

glob2rehttps://stackoverflow.com/a/29820981/220986提供:

def glob2re(pat):
    """Translate a shell PATTERN to a regular expression.

    There is no way to quote meta-characters.
    """

    i, n = 0, len(pat)
    res = ''
    while i < n:
        c = pat[i]
        i = i+1
        if c == '*':
            #res = res + '.*'
            res = res + '[^/]*'
        elif c == '?':
            #res = res + '.'
            res = res + '[^/]'
        elif c == '[':
            j = i
            if j < n and pat[j] == '!':
                j = j+1
            if j < n and pat[j] == ']':
                j = j+1
            while j < n and pat[j] != ']':
                j = j+1
            if j >= n:
                res = res + '\\['
            else:
                stuff = pat[i:j].replace('\\','\\\\')
                i = j+1
                if stuff[0] == '!':
                    stuff = '^' + stuff[1:]
                elif stuff[0] == '^':
                    stuff = '\\' + stuff
                res = '%s[%s]' % (res, stuff)
        else:
            res = res + re.escape(c)
    return res + '\Z(?ms)'

不漂亮,但避免了数据的复制,并且可以包含在一个小实用程序类中。


推荐阅读