python - 单元测试 apache 梁作业
问题描述
我有一个非常简单的数据流工作,我想编写单元测试。可悲的是,没有好的例子是最好的方法。
这是代码
import logging
from datetime import datetime
from re import sub
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from beam_nuggets.io import relational_db
class BeamOptions(PipelineOptions):
@classmethod
def _add_argparse_args(cls, parser):
parser.add_argument(
"--bigquery_input_table",
help="A table from BigQuery to process",
default="bigquery_input_table_empty",
)
parser.add_argument(
"--bigquery_project", help="Project with BigQuery data", default="foo"
)
parser.add_argument(
"--bigquery_dataset", help="Dataset from BigQuery", default="bar"
)
parser.add_argument(
"--bucket", help="GCS Bucket for temporary files", default="model-foo-dev"
)
parser.add_argument("--db_password", help="Password for profiles DB", default="postgres")
parser.add_argument("--db_host", help="host for profiles DB", default="localhost")
parser.add_argument("--db_port", help="port for profiles DB", default=5432)
class BeamFeed:
def __init__(self):
logging.info("fetching BeamOptions")
self.pipe_opt = BeamOptions()
all_options = self.pipe_opt.get_all_options()
self.bigquery_input_table = all_options["bigquery_input_table"]
self.bigquery_project = all_options["bigquery_project"]
self.bigquery_dataset = all_options["bigquery_dataset"]
self.google_cloud_options = self.pipe_opt.view_as(GoogleCloudOptions)
self.google_cloud_options.job_name = "model-foo-data-preparation-" + sub(
r"[^0-9]", "-", str(datetime.now().isoformat())
)
self.google_cloud_options.staging_location = "gs://" + all_options["bucket"] + "/staging"
self.google_cloud_options.temp_location = "gs://" + all_options["bucket"] + "/temp"
self.pg_config = relational_db.SourceConfiguration(
drivername="postgresql+pg8000",
host=all_options["db_host"],
port=all_options["db_port"],
username="postgres",
database="postgres",
password=all_options["db_password"],
)
self.table_config = relational_db.TableConfiguration(
name="users", primary_key_columns=["id"]
)
def run(self, pipeline):
"""runs whole beam job pipeline"""
users_bq = beam.io.BigQuerySource(
dataset=self.bigquery_dataset,
project=self.bigquery_project,
table=self.bigquery_input_table,
validate=True,
)
users_info = (
pipeline
| "read_users" >> beam.io.Read(users_bq)
| "map_by_user_id" >> beam.Map(lambda usr: {"id": usr["id"], "value": usr})
)
users_info | "Writing to DB table" >> relational_db.Write(
source_config=self.pg_config, table_config=self.table_config
)
if __name__ == "__main__":
logging.getLogger().setLevel(logging.INFO)
logging.info("starting BeamFeed process")
beam_job = BeamFeed()
with beam.Pipeline(options=beam_job.pipe_opt) as pipeline:
beam_job.run(pipeline)
和UT尝试
import pytest
import apache_beam
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import assert_that, equal_to
from apache_beam.transforms import Create, FlatMap
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.direct.direct_runner import DirectRunner
from bla import beam, BeamFeed, BeamOptions
@pytest.fixture
def bg_mock(mocker):
# apache_beam.Create(
# [{"id": "8d73fda0-00b8-4084-bb8a-eea0f5bca46a", "foo": 1}]
# )
return mocker.patch("apache_beam.io.BigQuerySource")
def test_empty_profile_generation(bg_mock):
pipeline = TestPipeline(runner=DirectRunner())
beam_job = BeamFeed()
beam_job.run(pipeline)
pipeline.run()
我不确定测试 Dataflow 作业的最佳方法是什么。只是集成测试,没有模拟?使用给定的模拟 pytest 失败[1] 16845 abort
解决方案
我同意@de1 的观点,Apache Beam 有一些非常好的测试来激发你自己的灵感。
根据您要查找的内容,我认为 Apache Beam 的 SDK 代码中有几个文件可以帮助您设置单元测试。
此外,如果您想查看 BigQuery 的更多测试,请查看此.
快速概述他们如何进行测试:
- 管道是通过使用 TestPipeline 进行验证(带有equal_to 的 assert_that)设置的,这可能是为了直接在 Job 中进行额外级别的测试,对于您的情况可能不需要,您可以继续使用Pipeline。
- 该测试使用TestPipeline和PipelineStateMatcher来验证和验证管道作业是否以预期状态终止。
- 需要进行自己的设置才能拥有可用于测试的测试表。在这种情况下,有 4 个表,但它们实际上只使用 1,所以如果你只创建 1 或 2 个表,我认为很酷。这必须根据您的需要。
- 最后,使用unittest.main函数运行测试。
我希望这个示例可以帮助您创建自己的作业和单元测试。
推荐阅读
- entity-framework-core - EFCore 和 C# 5.0 的多对多关系:如何从两个表中获取字段
- python - 从班级打开的窗口显示不正确
- plot - AttributeError: 'list' object has no attribute 'shape', 如何解决这个错误
- android - 如何制作一个返回通用对象的函数?
- npm - Npm 在定义对等依赖时包括所有预发布范围
- reactjs - 如何重用重载的函数类型并将其分配给另一个函数?
- azure-devops - az pipelines 错误:TF400813:用户 '' 无权访问此资源
- css - 如何在 Evolution Mail 中更改日历的 Today-Background
- javascript - 如何按顺序从数组中获取元素?
- api - 将 HTTP 转码为 gRPC:具有不同参数的相同端点