首页 > 解决方案 > Apache Beam 在 Google Dataflow 工作器上的“process_outputs”期间出现令人困惑的错误

问题描述

我正在Google Dataflow 上运行以下成功的 Apache Beam 测试管道。它使用 Datastore 作为源和接收器。我们数据库中的许多实体都分配给命名空间。此管道旨在对_do_work()给定名称空间中某种特定类型的所有实体执行。请注意,对非命名空间实体执行相同操作的类似测试管道也可以成功运行:

import apache_beam as beam
from apache_beam.io.gcp.datastore.v1 import helper as apache_helper
from apache_beam.io.gcp.datastore.v1 import datastoreio
from google.cloud.proto.datastore.v1 import query_pb2
from googledatastore import helper

from .pipelines.dataflow_settings import (
    PROJECT, NAMESPACES_PER_PIPELINE
)

class NamespacedDatastoreMigration(_DatastoreMigrationBase):
    """
    Map a do-function over a query multiplexed across several namespaces.

    The inheritor must implement the following:
     - a PROJECT class attribute
     - a do-function (_do_work())
     - a method to get the namespaces across which to shard the query (
       get_namespaces())
    """
    _NAMESPACES_PER_PIPELINE = NAMESPACES_PER_PIPELINE  # 25

    def __init__(self, argv, migration_history_obj, model):
        super(NamespacedDatastoreMigration, self).__init__(argv, migration_history_obj, model)
        self._namespaces = self.get_namespaces()
        self._pipelines = self._create_pipelines(argv)

    def get_namespaces(self):
        query_pb = query_pb2.Query()
        helper.set_kind(query_pb, "__namespace__")
        client = apache_helper.get_datastore(PROJECT)
        namespace_entities = apache_helper.fetch_entities(PROJECT, '', query_pb, client)

        namespaces = []
        for n in namespace_entities:
            # Get namespace name or id
            key_path = n.key.path[-1]
            if key_path.HasField('id'):
                name_or_id = key_path.id
            else:
                name_or_id = key_path.name

            # Avoid duplicates and test namespaces
            if len(str(name_or_id)) > 1 and name_or_id not in namespaces:
                namespaces.append(name_or_id)

        return namespaces

    def run(self):
        for pipeline in self._pipelines:
            pipeline.run()

    def _create_pipelines(self, argv):
        pipelines = []
        for namespaces in zip(*[iter(self._namespaces)] * self._NAMESPACES_PER_PIPELINE):
            p = beam.Pipeline(argv=argv)
            (
                (
                    p | 'ReadNamespace_{}'.format(
                        ns
                    ) >> datastoreio.ReadFromDatastore(
                        project=self.PROJECT,
                        query=self.query(),
                        namespace=ns
                    )
                    for ns in namespaces
                )
                | 'JoinNamespaceEntities' >> beam.Flatten()
                | self.__class__.__name__ >> beam.FlatMap(self._do_work)
                | self._get_sink()
            )
            pipelines.append(p)

        return pipelines

model = "App"
NamespacedDatastoreMigration(
    argv,
    kwargs.get('migration_history_obj'),  # Irrelevant here
    model  # Entity kind
).run()

其中 argv 是:

argv = [
    '--project={0}'.format(PROJECT),
    '--job_name=' + name,  # A human readable descriptor that's been cleaned
    '--staging_location=gs://{0}/migrations/'.format(BUCKET),
    '--temp_location=gs://{0}/migrations/'.format(BUCKET),
    '--setup_file=./setup.py',
    '--runner=DataflowRunner'
]

这是基于子类:

class _DatastoreMigrationBase(object):
    PROJECT = PROJECT

    def __init__(self, argv, migration_history_obj, model):
        self.migration_history_obj = migration_history_obj

        if not model:
            raise Exception('This operation requires a model class name.')
        self.model = model

    def query(self):
        # Instantiate a filter protobuf
        filter_pb = query_pb2.Filter()

        # Get all non-deleted model instances
        helper.set_property_filter(
            filter_pb,
            'deleted',
            query_pb2.PropertyFilter.EQUAL,
            False
        )

        # Instantiate a query protobuf
        query_pb = query_pb2.Query(
            filter=filter_pb
        )
        helper.set_kind(query_pb, self.model)

        return query_pb

    def _get_source(self):
        return 'DatastoreRead' >> datastoreio.ReadFromDatastore(
            self.PROJECT,
            self.query(),
        )

    @staticmethod
    def _do_work(entity):
        return entity

    def _get_sink(self):
        return 'WriteToDatastore' >> datastoreio.WriteToDatastore(
            self.PROJECT
        )

但是,当我NamespacedDatastoreMigration这样子类化时:

from ..helpers import create_argv
from ..mappers import NamespacedDatastoreMigration


class CampaignActionField(NamespacedDatastoreMigration):
    @staticmethod
    def _do_work(entity):
        target_url = entity.properties.get('target_url').string_value
        message = entity.properties.get('message').string_value
        path = entity.properties.get('path').string_value
        if target_url and not message and not path:
            entity.properties.get('action').string_value = 'webhook'

        return entity


model = "Campaign"  # Entity kind
CampaignActionField(
    create_argv(kwargs.get('name')),  # "ED-2275 Campaign action field"
    kwargs.get('migration_history_obj'),  # Irrelevant here
    model
).run()

这个新管道在 Dataflow 上运行,它失败了。起初,它开始没问题。我的意思是我看到以下信息日志:

2018-11-20 (11:02:57) Worker configuration: n1-standard-1 in us-central1-b.

2018-11-20 (11:03:15) Starting 1 workers in us-central1-b.


# SEVERAL OF THE FOLLOWING FOR DIFFERENT NAMESPACES:

2018-11-20 (11:03:15) Executing operation ReadNamespace_xxxx_1/GroupByKey/Create

2018-11-20 (11:03:17) Executing operation ReadNamespace_xxxx_1/UserQuery/Read+ReadNamespace_xxxx_1/SplitQuery+ReadNa...

2018-11-20 (11:05:58) Executing operation ReadNamespace_xxxx_1/GroupByKey/Close

然后我得到这个回溯:

Traceback (most recent call last):
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/batchworker.py", line 642, in do_work
    work_executor.execute()
  File "/usr/local/lib/python2.7/dist-packages/dataflow_worker/executor.py", line 156, in execute
    op.start()
  File "dataflow_worker/shuffle_operations.py", line 49, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    def start(self):
  File "dataflow_worker/shuffle_operations.py", line 50, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    with self.scoped_start_state:
  File "dataflow_worker/shuffle_operations.py", line 65, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    with self.scoped_process_state:
  File "dataflow_worker/shuffle_operations.py", line 66, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    with self.shuffle_source.reader() as reader:
  File "dataflow_worker/shuffle_operations.py", line 70, in dataflow_worker.shuffle_operations.GroupedShuffleReadOperation.start
    self.output(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "dataflow_worker/shuffle_operations.py", line 229, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
    with self.scoped_process_state:
  File "dataflow_worker/shuffle_operations.py", line 236, in dataflow_worker.shuffle_operations.BatchGroupAlsoByWindowsOperation.process
    self.output(wvalue.with_value((k, wvalue.value)))
  File "apache_beam/runners/worker/operations.py", line 168, in apache_beam.runners.worker.operations.Operation.output
    cython.cast(Receiver, self.receivers[output_index]).receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 702, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise
  File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 787, in apache_beam.runners.common._OutputProcessor.process_outputs
    self.main_receivers.receive(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 88, in apache_beam.runners.worker.operations.ConsumerSet.receive
    cython.cast(Operation, consumer).process(windowed_value)
  File "apache_beam/runners/worker/operations.py", line 423, in apache_beam.runners.worker.operations.DoOperation.process
    with self.scoped_process_state:
  File "apache_beam/runners/worker/operations.py", line 424, in apache_beam.runners.worker.operations.DoOperation.process
    self.dofn_receiver.receive(o)
  File "apache_beam/runners/common.py", line 673, in apache_beam.runners.common.DoFnRunner.receive
    self.process(windowed_value)
  File "apache_beam/runners/common.py", line 679, in apache_beam.runners.common.DoFnRunner.process
    self._reraise_augmented(exn)
  File "apache_beam/runners/common.py", line 717, in apache_beam.runners.common.DoFnRunner._reraise_augmented
    raise_with_traceback(new_exn)
  File "apache_beam/runners/common.py", line 677, in apache_beam.runners.common.DoFnRunner.process
    self.do_fn_invoker.invoke_process(windowed_value)
  File "apache_beam/runners/common.py", line 413, in apache_beam.runners.common.SimpleInvoker.invoke_process
    output_processor.process_outputs(
  File "apache_beam/runners/common.py", line 763, in apache_beam.runners.common._OutputProcessor.process_outputs
    for result in results:
TypeError: 'Entity' object is not iterable [while running 's152-c260']

我认为这与 and 中的两个函数之间的差异有关,_do_work()因为前者成功而后者失败,并且是它们之间的唯一区别(除了正在转换的实体类型)。但我想不出究竟出了什么问题以及如何解决它。有人有想法吗?NamespacedDatastoreMigrationCampaignActionField_do_work()

标签: python-2.7google-cloud-platformgoogle-cloud-dataflowapache-beam

解决方案


事实证明,更改FlatMapMapinNamespacedDatastoreMigration_create_pipelines方法为我解决了这个问题。我也愚蠢地NamespacedDatastoreMigration使用非命名空间模型调用,这就是为什么它成功而CamapaignActionField(使用命名空间模型)却没有。


推荐阅读