azure - 如何处理数据块中具有长路径的 blob 存储中的许多文件?
问题描述
我为 API 管理服务启用了日志记录,并且日志存储在存储帐户中。现在我正在尝试在 Azure Databricks 工作区中处理它们,但我正在努力访问这些文件。
问题似乎是自动生成的虚拟文件夹结构如下所示:
/insights-logs-gatewaylogs/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=*/m=*/d=*/h=*/m=00/PT1H.json
我已将insights-logs-gatewaylogs
容器安装在下面/mnt/diags
,并且dbutils.fs.ls('/mnt/diags')
正确列出了该resourceId=
文件夹,但未dbutils.fs.ls('/mnt/diags/resourceId=')
找到声明文件
如果我沿虚拟文件夹结构创建空标记 blob,我可以列出每个后续级别,但该策略显然会失败,因为路径的最后一部分是按年/月/日/小时动态组织的。
例如一个
spark.read.format('json').load("dbfs:/mnt/diags/logs/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=*/m=*/d=*/h=*/m=00/PT1H.json")
产生此错误:
java.io.FileNotFoundException: File/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=2019 does not exist.
很明显,通配符已经找到了第一年的文件夹,但拒绝再往下走。
我在 Azure 数据工厂中设置了一个复制作业,它成功复制了同一个 blob 存储帐户中的所有 json blob 并删除了resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>
前缀(因此根文件夹以年份组件开头)并且可以一直成功访问而无需创建空的标记 blob。
所以问题似乎与大部分为空的长虚拟文件夹结构有关。
还有另一种方法来处理数据块中的这些文件夹结构吗?
更新:我也尝试source
在安装时提供路径,但这也无济于事
解决方案
我想我可能已经找到了这个问题的根本原因。应该早点尝试过,但我提供了现有 blob 的确切路径,如下所示:
spark.read.format('json').load("dbfs:/mnt/diags/logs/resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>/y=2019/m=08/d=20/h=06/m=00/PT1H.json")
我得到了一个更有意义的错误:
shaded.databricks.org.apache.hadoop.fs.azure.AzureException: com.microsoft.azure.storage.StorageException: Blob 类型不正确,请使用正确的 Blob 类型访问服务器上的 Blob。预期的 BLOCK_BLOB,实际的 APPEND_BLOB。
事实证明,开箱即用的日志记录会创建附加 blob(并且似乎没有办法改变这一点),并且从这张票的外观来看,对附加 blob 的支持仍然是 WIP:https://issues.apache .org/jira/browse/HADOOP-13475
这FileNotFoundException
可能是一个红鲱鱼,这可能是由于在尝试扩展通配符并找到不受支持的 blob 类型时吞下内部异常而导致的。
更新
终于找到了合理的解决办法。我在我的工作区中安装了azure-storage
Python 包(如果你在家里使用 Scala,它已经安装了)并自己加载了 blob。下面的大多数代码都是为了添加通配符支持,如果您愿意只匹配路径前缀,则不需要它:
%python
import re
import json
from azure.storage.blob import AppendBlobService
abs = AppendBlobService(account_name='<account>', account_key="<access_key>")
base_path = 'resourceId=/SUBSCRIPTIONS/<subscription>/RESOURCEGROUPS/<resource group>/PROVIDERS/MICROSOFT.APIMANAGEMENT/SERVICE/<api service>'
pattern = base_path + '/*/*/*/*/m=00/*.json'
filter = glob2re(pattern)
spark.sparkContext \
.parallelize([blob.name for blob in abs.list_blobs('insights-logs-gatewaylogs', prefix=base_path) if re.match(filter, blob.name)]) \
.map(lambda blob_name: abs.get_blob_to_bytes('insights-logs-gatewaylogs', blob_name).content.decode('utf-8').splitlines()) \
.flatMap(lambda lines: [json.loads(l) for l in lines]) \
.collect()
glob2re
由https://stackoverflow.com/a/29820981/220986提供:
def glob2re(pat):
"""Translate a shell PATTERN to a regular expression.
There is no way to quote meta-characters.
"""
i, n = 0, len(pat)
res = ''
while i < n:
c = pat[i]
i = i+1
if c == '*':
#res = res + '.*'
res = res + '[^/]*'
elif c == '?':
#res = res + '.'
res = res + '[^/]'
elif c == '[':
j = i
if j < n and pat[j] == '!':
j = j+1
if j < n and pat[j] == ']':
j = j+1
while j < n and pat[j] != ']':
j = j+1
if j >= n:
res = res + '\\['
else:
stuff = pat[i:j].replace('\\','\\\\')
i = j+1
if stuff[0] == '!':
stuff = '^' + stuff[1:]
elif stuff[0] == '^':
stuff = '\\' + stuff
res = '%s[%s]' % (res, stuff)
else:
res = res + re.escape(c)
return res + '\Z(?ms)'
不漂亮,但避免了数据的复制,并且可以包含在一个小实用程序类中。
推荐阅读
- javascript - 检查反应用户是否有角色
- npm - NPM 在“审计修复”上抛出错误 - 不支持配置的注册表
- excel - 为什么 excel 宏崩溃并显示“excel 正在等待另一个应用程序完成 OLE 操作”但不使用任何其他应用程序/文件?
- java - 通过 HashMap 迭代(复杂性)
- python - 决策树桩的一种实现的细微错误
- python - Python:在合并某些键但不合并其他键的同时合并字典?
- three.js - Three.js 中的度量单位是什么?
- math - C#/Unity:在 3D 空间中旋转三角形以形成 2D 多边形
- c# - 在 2.1 中使用 .Net Core 2.2 类
- azure-ad-b2c - Azure AD B2C:向应用程序添加 api 访问时出现内部错误