首页 > 解决方案 > 单元测试 apache 梁作业

问题描述

我有一个非常简单的数据流工作,我想编写单元测试。可悲的是,没有好的例子是最好的方法。

这是代码

import logging
from datetime import datetime
from re import sub

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from beam_nuggets.io import relational_db


class BeamOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument(
            "--bigquery_input_table",
            help="A table from BigQuery to process",
            default="bigquery_input_table_empty",
        )
        parser.add_argument(
            "--bigquery_project", help="Project with BigQuery data", default="foo"
        )
        parser.add_argument(
            "--bigquery_dataset", help="Dataset from BigQuery", default="bar"
        )
        parser.add_argument(
            "--bucket", help="GCS Bucket for temporary files", default="model-foo-dev"
        )
        parser.add_argument("--db_password", help="Password for profiles DB", default="postgres")
        parser.add_argument("--db_host", help="host for profiles DB", default="localhost")
        parser.add_argument("--db_port", help="port for profiles DB", default=5432)


class BeamFeed:
    def __init__(self):
        logging.info("fetching BeamOptions")
        self.pipe_opt = BeamOptions()
        all_options = self.pipe_opt.get_all_options()
        self.bigquery_input_table = all_options["bigquery_input_table"]
        self.bigquery_project = all_options["bigquery_project"]
        self.bigquery_dataset = all_options["bigquery_dataset"]

        self.google_cloud_options = self.pipe_opt.view_as(GoogleCloudOptions)
        self.google_cloud_options.job_name = "model-foo-data-preparation-" + sub(
            r"[^0-9]", "-", str(datetime.now().isoformat())
        )
        self.google_cloud_options.staging_location = "gs://" + all_options["bucket"] + "/staging"
        self.google_cloud_options.temp_location = "gs://" + all_options["bucket"] + "/temp"
        self.pg_config = relational_db.SourceConfiguration(
            drivername="postgresql+pg8000",
            host=all_options["db_host"],
            port=all_options["db_port"],
            username="postgres",
            database="postgres",
            password=all_options["db_password"],
        )
        self.table_config = relational_db.TableConfiguration(
            name="users", primary_key_columns=["id"]
        )

    def run(self, pipeline):
        """runs whole beam job pipeline"""
        users_bq = beam.io.BigQuerySource(
            dataset=self.bigquery_dataset,
            project=self.bigquery_project,
            table=self.bigquery_input_table,
            validate=True,
        )

        users_info = (
            pipeline
            | "read_users" >> beam.io.Read(users_bq)
            | "map_by_user_id" >> beam.Map(lambda usr: {"id": usr["id"], "value": usr})
        )

        users_info | "Writing to DB table" >> relational_db.Write(
            source_config=self.pg_config, table_config=self.table_config
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    logging.info("starting BeamFeed process")
    beam_job = BeamFeed()
    with beam.Pipeline(options=beam_job.pipe_opt) as pipeline:
        beam_job.run(pipeline)


和UT尝试

import pytest

import apache_beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
from apache_beam.transforms import Create, FlatMap
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.direct.direct_runner import DirectRunner

from bla import beam, BeamFeed, BeamOptions



@pytest.fixture
def bg_mock(mocker):
    # apache_beam.Create(
    #     [{"id": "8d73fda0-00b8-4084-bb8a-eea0f5bca46a", "foo": 1}]
    # )
    return mocker.patch("apache_beam.io.BigQuerySource")


def test_empty_profile_generation(bg_mock):
    pipeline = TestPipeline(runner=DirectRunner())
    beam_job = BeamFeed()
    beam_job.run(pipeline)
    pipeline.run()

我不确定测试 Dataflow 作业的最佳方法是什么。只是集成测试,没有模拟?使用给定的模拟 pytest 失败[1] 16845 abort

标签: pythongoogle-cloud-dataflowapache-beam

解决方案


我同意@de1 的观点,Apache Beam 有一些非常好的测试来激发你自己的灵感。

根据您要查找的内容,我认为 Apache Beam 的 SDK 代码中有几个文件可以帮助您设置单元测试。

此外,如果您想查看 BigQuery 的更多测试,请查看.

快速概述他们如何进行测试:

  • 管道是通过使用 TestPipeline 进行验证(带有equal_to 的 assert_that)设置的,这可能是为了直接在 Job 中进行额外级别的测试,对于您的情况可能不需要,您可以继续使用Pipeline
  • 该测试使用TestPipelinePipelineStateMatcher来验证和验证管道作业是否以预期状态终止。
  • 需要进行自己的设置才能拥有可用于测试的测试表。在这种情况下,有 4 个表,但它们实际上只使用 1,所以如果你只创建 1 或 2 个表,我认为很酷。这必须根据您的需要。
  • 最后,使用unittest.main函数运行测试。

我希望这个示例可以帮助您创建自己的作业和单元测试。


推荐阅读