json - 使用 Apache Beam 和数据流将许多 json 加载到 BQ - json 模式错误
问题描述
我需要在 Python 中使用 Apache Beam 将大量 json 文件加载到 BQ 中。jsons 有一个非常复杂的模式(具有多个层次结构),更重要的是 - 它并不一致。有些字段非常罕见,以至于它们只出现在 0.01% 的 json 中。我不能让 BQ 使用 AUTO_DETECT 推断 WriteToBigQuery 方法中的模式,因为它只检查 100 行 - 还远远不够。我尝试使用 python generate-schema 实用程序针对 0.1% 的数据构建模式 - 但同样,某些字段非常罕见,以至于它仍然失败。
没有这样的字段:FIELD_NAME。
我尝试找到一种方法来上传文件而不考虑任何错误,并将错误保存到错误表中,我可以单独处理。但是,我无论如何都没有在 WriteToBigQuery 模块中找到这样做。我还尝试在将每个 json 发送到管道之前对其进行验证,但速度非常慢。我还尝试根据指定的模式“过滤”json,但这需要遍历所有 json - 也很慢,因为每个 json 大小约为 13 KB。
有没有人遇到任何可以提供帮助的东西?奇怪的是,使用 Apache Beam 写入 BQ 时没有使用任何 max_rejected 属性。任何有关如何处理此问题的想法将不胜感激。
解决方案
一种可能性是“手动”计算模式。如果我们将模式表示为一组元组set([field, type])
- 例如set([('name', str), ('age', int)])
。
class CombineSchemasByDestination(beam.DoFn):
def __init__(self):
self.schemas_per_dest = defaultdict(set)
def process(self, dest_schema):
destination, schemas = dest_schema
for s in schemas:
self.schemas_per_dest[destination].union(s)
def finish_bundle(self):
for dest, schema in self.schemas_per_dest.items():
yield (dest, schema)
schemas_per_dest = (my_data
| beam.Map(lambda row: (get_destination(row),
[get_row_schema(row)]))
| beam.ParDo(CombineSchemasByDestination())
| beam.GroupByKey()
| beam.CombineSchemasByDestination())
my_data | beam.WriteToBigQuery(....
schema=lambda dest, schema_map: schema_map.get(dest),
schema_side_inputs=(beam.pvalue.AsDict(schemas_per_dest,))
我认为这应该有助于解决您的问题。想法?