python - 数据流管道中 WriteToText 文件的 ERRNO2
问题描述
我有一个带有多个ParDo
转换的分支管道,这些转换被合并并写入 GCS 存储桶中的文本文件记录。
管道崩溃后,我收到以下消息:
The worker lost contact with the service.
RuntimeError: FileNotFoundError: [Errno 2] Not found: gs://MYBUCKET/JOBNAME.00000-of-00001.avro [while running 'WriteToText/WriteToText/Write/WriteImpl/WriteBundles/WriteBundles']
看起来它找不到它一直在写入的日志文件。直到发生错误的某个点似乎都很好。我想将try:
/包裹except:
在它或断点周围,但我什至不确定如何发现根本原因。
有没有办法只写一个文件?还是只打开一个文件写入一次?它将数千个输出文件发送到此存储桶中,这是我想消除的,并且可能是一个因素。
with beam.Pipeline(argv=pipeline_args) as p:
csvlines = (
p | 'Read From CSV' >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
| 'Parse CSV to Dictionary' >> beam.ParDo(Split())
| 'Read Files into Memory' >> beam.ParDo(DownloadFilesDoFn())
| 'Windowing' >> beam.WindowInto(window.FixedWindows(20 * 60))
)
b1 = ( csvlines | 'Branch1' >> beam.ParDo(Branch1DoFn()) )
b2 = ( csvlines | 'Branch2' >> beam.ParDo(Branch2DoFn()) )
b3 = ( csvlines | 'Branch3' >> beam.ParDo(Branch3DoFn()) )
b4 = ( csvlines | 'Branch4' >> beam.ParDo(Branch4DoFn()) )
b5 = ( csvlines | 'Branch5' >> beam.ParDo(Branch5DoFn()) )
b6 = ( csvlines | 'Branch6' >> beam.ParDo(Branch6DoFn()) )
output = (
(b1,b2,b3,b4,b5,b6) | 'Merge PCollections' >> beam.Flatten()
| 'WriteToText' >> beam.io.Write(beam.io.textio.WriteToText(known_args.output))
)
解决方案
这个问题与前一个问题相关联,该问题包含有关实施的更多细节。那里的解决方案建议google.cloud.storage.Client()
在start_bundle()
每次调用ParDo(DoFn)
. 这连接到同一个 gcs 存储桶 - 通过 args 给出WriteToText(known_args.output)
class DownloadFilesDoFn(beam.DoFn):
def __init__(self):
import re
self.gcs_path_regex = re.compile(r'gs:\/\/([^\/]+)\/(.*)')
def start_bundle(self):
import google.cloud.storage
self.gcs = google.cloud.storage.Client()
def process(self, element):
self.file_match = self.gcs_path_regex.match(element['Url'])
self.bucket = self.gcs.get_bucket(self.file_match.group(1))
self.blob = self.bucket.get_blob(self.file_match.group(2))
self.f = self.blob.download_as_bytes()
此错误的原因可能与与客户端的连接过多有关。我不清楚这方面的良好做法 - 因为在其他地方建议您可以为每个捆绑包以这种方式设置网络连接。
将此添加到末尾以在捆绑末尾从内存中删除客户端对象应该有助于关闭一些不必要的延迟连接。
def finish_bundle(self):
del self.gcs, self.gcs_path_regex
推荐阅读
- php - 如何在自定义助手或指令中获取 Laravel 中的视图数据
- amazon-web-services - IAM - 如何限制对队列创建的访问?
- ios - 如何防止秒表在暂停后重置
- typescript - TypeScript 循环遍历元组数组
- angular - 父属性更改时如何不重新初始化子组件?
- java - 用 mockito 模拟 android.content.res.Configuration
- python - 无法在 Python 中的列表列表中索引多个元素(使用 : 运算符)
- reactjs - 如何修复不允许输入文本的表单?
- python - 如何加入 2 个表并获得对象上的结果
- javascript - 调用json文件时没有出现文本,但app没有中断