首页 > 解决方案 > Apache Beam ReadFromMongoDB python sdk 抛出错误:无法从 MongoDb 读取

问题描述

我一直在尝试使用 apache beam python sdk 从 mongoDb 读取集合并写入谷歌云存储(gcs),在本地计算机上使用 Directrunner 尝试相同并写入本地文件,代码执行良好,没有输出。我在这里想念什么?非常感谢我们的任何帮助或反馈。

使用的代码:

import apache_beam as beam
import os
import json
import argparse
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions

parser = argparse.ArgumentParser()


parser.add_argument('--mongoUri',
                    dest='mongoUri',
                    required=True,
                    help='mongo collection to read data from.')

parser.add_argument('--storageLocation',
                    dest='storageLocation',
                    required=True,
                    help='mongo collection to insert data to.')

parser.add_argument('--mongoDbName',
                    dest='mongoDbName',
                    required=True,
                    help='mongo db name.')

parser.add_argument('--mongoCollection',
                    dest='mongoCollection',
                    required=True,
                    help='mongo collection to insert data to.')

path_args, pipeline_args = parser.parse_known_args()

mongoUri = path_args.mongoUri
storageLocation = path_args.storageLocation
mongoDbName = path_args.mongoDbName
mongoCollection = path_args.mongoCollection

options = PipelineOptions(pipeline_args)
options.view_as(StandardOptions).streaming = False

p = beam.Pipeline(options=options)

colls = (
    p
    | 'read' >> beam.io.ReadFromMongoDB(uri=mongoUri, db=mongoDbName, coll=mongoCollection )
    | 'Convert to JSON' >> beam.Map(lambda elem: json.loads(elem)
)
)

savetogcs = (
       colls
       | 'save' >> beam.io.WriteToText(storageLocation)
)

result = p.run()

数据流上的堆栈跟踪

Traceback (most recent call last):
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 647, in do_work
    work_executor.execute()
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 226, in execute
    self._split_task)
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 234, in _perform_source_split_considering_api_limits
    desired_bundle_size)
  File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 271, in _perform_source_split
    for split in source.split(desired_bundle_size):
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 171, in split
    start_position, stop_position)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 279, in _replace_none_positions
    stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 331, in increment_id
    id_number = _ObjectIdHelper.id_to_int(object_id)
  File "/usr/local/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 296, in id_to_int
    ints = struct.unpack('>III', id.binary)
AttributeError: 'str' object has no attribute 'binary'

标签: pythonpython-3.xmongodbapache-beam

解决方案


看起来当前的 apache 梁连接器只处理 ObjectIds 而不是字符串 _ids。

我将连接器指向具有 ObjectIds 的集合,而上述错误消失了。


推荐阅读