python - Apache Beam ReadFromMongoDB python sdk 抛出错误:无法从 MongoDb 读取
问题描述
我一直在尝试使用 apache beam python sdk 从 mongoDb 读取集合并写入谷歌云存储(gcs),在本地计算机上使用 Directrunner 尝试相同并写入本地文件,代码执行良好,没有输出。我在这里想念什么?非常感谢我们的任何帮助或反馈。
使用的代码:
import apache_beam as beam
import os
import json
import argparse
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
parser = argparse.ArgumentParser()
parser.add_argument('--mongoUri',
dest='mongoUri',
required=True,
help='mongo collection to read data from.')
parser.add_argument('--storageLocation',
dest='storageLocation',
required=True,
help='mongo collection to insert data to.')
parser.add_argument('--mongoDbName',
dest='mongoDbName',
required=True,
help='mongo db name.')
parser.add_argument('--mongoCollection',
dest='mongoCollection',
required=True,
help='mongo collection to insert data to.')
path_args, pipeline_args = parser.parse_known_args()
mongoUri = path_args.mongoUri
storageLocation = path_args.storageLocation
mongoDbName = path_args.mongoDbName
mongoCollection = path_args.mongoCollection
options = PipelineOptions(pipeline_args)
options.view_as(StandardOptions).streaming = False
p = beam.Pipeline(options=options)
colls = (
p
| 'read' >> beam.io.ReadFromMongoDB(uri=mongoUri, db=mongoDbName, coll=mongoCollection )
| 'Convert to JSON' >> beam.Map(lambda elem: json.loads(elem)
)
)
savetogcs = (
colls
| 'save' >> beam.io.WriteToText(storageLocation)
)
result = p.run()
数据流上的堆栈跟踪
Traceback (most recent call last):
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/batchworker.py", line 647, in do_work
work_executor.execute()
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 226, in execute
self._split_task)
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 234, in _perform_source_split_considering_api_limits
desired_bundle_size)
File "/usr/local/lib/python3.7/site-packages/dataflow_worker/executor.py", line 271, in _perform_source_split
for split in source.split(desired_bundle_size):
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 171, in split
start_position, stop_position)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 279, in _replace_none_positions
stop_position = _ObjectIdHelper.increment_id(last_doc_id, 1)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 331, in increment_id
id_number = _ObjectIdHelper.id_to_int(object_id)
File "/usr/local/lib/python3.7/site-packages/apache_beam/io/mongodbio.py", line 296, in id_to_int
ints = struct.unpack('>III', id.binary)
AttributeError: 'str' object has no attribute 'binary'
解决方案
看起来当前的 apache 梁连接器只处理 ObjectIds 而不是字符串 _ids。
我将连接器指向具有 ObjectIds 的集合,而上述错误消失了。
推荐阅读
- html - 当我在 w3c 验证器中运行下面的代码时,它返回此错误“结束标记 a 违反嵌套规则。”。问题可能出在哪里
- php - 在 Laravel 中 queue:work 和 config/queue.php 传递的参数之间的区别
- javascript - JavaScript scrollBy(x,y) 不适用于 Android WebView 中的样式列宽
- kubernetes - 只能通过 HTTP 而不是 HTTPS 访问我的应用程序
- c# - 由于计时和线程问题,NUnit 测试执行失败
- php - PHP上传本地文件无按钮
- reactjs - 在反应中触发 NavLink 上的焦点()
- python - 使用 dataframe.at 时浮点数被舍入,而不是 dataframe.iloc
- javascript - setInterval 和 setTimeout 中的间隔不准确
- asciidoc - 如何为现有的 Asciidoctor Asciidoc 宏创建自定义 HTML 输出?