首页 > 解决方案 > 数据流管道中 WriteToText 文件的 ERRNO2

问题描述

我有一个带有多个ParDo转换的分支管道,这些转换被合并并写入 GCS 存储桶中的文本文件记录。

管道崩溃后,我收到以下消息:

看起来它找不到它一直在写入的日志文件。直到发生错误的某个点似乎都很好。我想将try:/包裹except:在它或断点周围,但我什至不确定如何发现根本原因。

有没有办法只写一个文件?还是只打开一个文件写入一次?它将数千个输出文件发送到此存储桶中,这是我想消除的,并且可能是一个因素。

with beam.Pipeline(argv=pipeline_args) as p:
     csvlines = (
            p | 'Read From CSV' >> beam.io.ReadFromText(known_args.input, skip_header_lines=1)
              | 'Parse CSV to Dictionary' >> beam.ParDo(Split())
              | 'Read Files into Memory' >> beam.ParDo(DownloadFilesDoFn())
              | 'Windowing' >> beam.WindowInto(window.FixedWindows(20 * 60))
            )

     b1 = ( csvlines | 'Branch1' >> beam.ParDo(Branch1DoFn()) )
     b2 = ( csvlines | 'Branch2' >> beam.ParDo(Branch2DoFn()) )
     b3 = ( csvlines | 'Branch3' >> beam.ParDo(Branch3DoFn()) )
     b4 = ( csvlines | 'Branch4' >> beam.ParDo(Branch4DoFn()) )
     b5 = ( csvlines | 'Branch5' >> beam.ParDo(Branch5DoFn()) )
     b6 = ( csvlines | 'Branch6' >> beam.ParDo(Branch6DoFn()) )
        
     output = (
           (b1,b2,b3,b4,b5,b6) | 'Merge PCollections' >> beam.Flatten()
           | 'WriteToText' >> beam.io.Write(beam.io.textio.WriteToText(known_args.output))
           )

标签: pythongoogle-cloud-platformgoogle-cloud-storagegoogle-cloud-dataflowapache-beam

解决方案


这个问题与前一个问题相关联,问题包含有关实施的更多细节。那里的解决方案建议google.cloud.storage.Client()start_bundle()每次调用ParDo(DoFn). 这连接到同一个 gcs 存储桶 - 通过 args 给出WriteToText(known_args.output)

class DownloadFilesDoFn(beam.DoFn):
    def __init__(self):
        import re
        self.gcs_path_regex = re.compile(r'gs:\/\/([^\/]+)\/(.*)')

    def start_bundle(self):
        import google.cloud.storage
        self.gcs = google.cloud.storage.Client()

    def process(self, element):
        self.file_match = self.gcs_path_regex.match(element['Url'])
        self.bucket = self.gcs.get_bucket(self.file_match.group(1))
        self.blob = self.bucket.get_blob(self.file_match.group(2))
        self.f = self.blob.download_as_bytes()

此错误的原因可能与与客户端的连接过多有关。我不清楚这方面的良好做法 - 因为在其他地方建议您可以为每个捆绑包以这种方式设置网络连接。

将此添加到末尾以在捆绑末尾从内存中删除客户端对象应该有助于关闭一些不必要的延迟连接。

    def finish_bundle(self):
        del self.gcs, self.gcs_path_regex

推荐阅读