首页 > 解决方案 > 使用 Apache Beam 和数据流将许多 json 加载到 BQ - json 模式错误

问题描述

我需要在 Python 中使用 Apache Beam 将大量 json 文件加载到 BQ 中。jsons 有一个非常复杂的模式(具有多个层次结构),更重要的是 - 它并不一致。有些字段非常罕见,以至于它们只出现在 0.01% 的 json 中。我不能让 BQ 使用 AUTO_DETECT 推断 WriteToBigQuery 方法中的模式,因为它只检查 100 行 - 还远远不够。我尝试使用 python generate-schema 实用程序针对 0.1% 的数据构建模式 - 但同样,某些字段非常罕见,以至于它仍然失败。

没有这样的字段:FIELD_NAME。

我尝试找到一种方法来上传文件而不考虑任何错误,并将错误保存到错误表中,我可以单独处理。但是,我无论如何都没有在 WriteToBigQuery 模块中找到这样做。我还尝试在将每个 json 发送到管道之前对其进行验证,但速度非常慢。我还尝试根据指定的模式“过滤”json,但这需要遍历所有 json - 也很慢,因为每个 json 大小约为 13 KB。

有没有人遇到任何可以提供帮助的东西?奇怪的是,使用 Apache Beam 写入 BQ 时没有使用任何 max_rejected 属性。任何有关如何处理此问题的想法将不胜感激。

标签: jsongoogle-bigquerygoogle-cloud-dataflowapache-beam

解决方案


一种可能性是“手动”计算模式。如果我们将模式表示为一组元组set([field, type])- 例如set([('name', str), ('age', int)])

class CombineSchemasByDestination(beam.DoFn):
  def __init__(self):
    self.schemas_per_dest = defaultdict(set)

  def process(self, dest_schema):
    destination, schemas = dest_schema
    for s in schemas:
      self.schemas_per_dest[destination].union(s)

  def finish_bundle(self):
    for dest, schema in self.schemas_per_dest.items():
      yield (dest, schema)

schemas_per_dest = (my_data 
                    | beam.Map(lambda row: (get_destination(row), 
                                            [get_row_schema(row)]))
                    | beam.ParDo(CombineSchemasByDestination())
                    | beam.GroupByKey()
                    | beam.CombineSchemasByDestination())

my_data | beam.WriteToBigQuery(....
  schema=lambda dest, schema_map: schema_map.get(dest),
  schema_side_inputs=(beam.pvalue.AsDict(schemas_per_dest,))

我认为这应该有助于解决您的问题。想法?


推荐阅读