python - Apache Beam Python 单元测试 ParDo 类 with_outputs
问题描述
我有一个 ParDo 类,它检查 pubsub 消息是否包含某些属性并返回一个有效和无效的 TaggedOutput (在正常流程中工作正常,使用 yield 来返回值),我无法对这个类进行单元测试,我正在尝试提供一个虚拟消息(字典复制 pubsub 消息信息),并且我想检查该类的输出是否包含其他属性。
这就是我到目前为止所拥有的:
class TestValidateMessage(unittest.TestCase):
def test_not_valid(self):
with TestPipeline() as p:
pcoll = (
p
| beam.Create([{'attributes':{"imageUrl":000}}])
| beam.ParDo(ValidateMessage()).with_outputs(
'invalid', main='valid'))
valid, _ = pcoll
invalid = pcoll['invalid']
print(invalid)
assert_that(invalid, {'failure_step':'Message validation'})
有了这个,我收到一条错误消息:
TypeError: Map can be used only with callable objects. Received {'failure_step'} instead.
当我尝试打印(无效)时,我得到PCollection[ParDo(ValidateMessage)/ParDo(ValidateMessage).invalid]
如何访问(出于断言目的)PCollection 的内容?
解决方案
TLDR;
这段代码存在三个问题:
- assert_that()在您运行管道后执行。
- 您必须使用 equal_to()、is_empty() 或 is_not_empty() 作为 assert_that() 中的条件
- 在管道结束时,您将获得一个数组。始终针对 [ expected_result ] 进行测试。
更正的代码:
class TestValidateMessage(unittest.TestCase):
def test_not_valid(self):
with TestPipeline() as p:
pcoll = (
p
| beam.Create([{'attributes':{"imageUrl":000}}])
| beam.ParDo(ValidateMessage()).with_outputs(
'invalid', main='valid'))
assert_that(pcoll.invalid,
equal_to([ {'failure_step':'Message validation'} ] )
长解释
在 Apache Beam 中,您必须测试管道内的代码。
Beam 为您从 PCollection 构建管道进行断言。看起来很复杂,但它看起来更简单。
如果您有这样的管道:
with TestPipeline() as p:
pcoll = p | Beam.Create( testdata ) | Beam.DoFn(I_Want_To_Test_This())
您必须在 with 子句中添加 assert_that ,因为 assert_that() 将添加代码以在管道内执行断言。
with TestPipeline() as p:
pcoll = p | Beam.Create( testdata ) | Beam.ParDo(I_Want_To_Test_This_DoFn())
assert_that(pcoll, equal_to(expected_data) )
这与执行此操作相同:
P = TestPipeline()
pcoll = p | Beam.Create( testdata ) | Beam.ParDo(I_Want_To_Test_This_DoFn())
assert_that(pcoll, equal_to(expected_data) )
p.run() # Test must be run _inside_ the pipeline
当您有多个输出时,它是相似的:
with TestPipeline() as p:
pcoll = (
p
| Beam.Create( testdata )
| Beam.ParDo(I_Want_To_Test_This_DoFn()).with_outputs('valid','invalid')
)
# You can test inside the pipeline with assert_that
assert_that(pcoll.invalid, equal_to( [ {'failure step':'Message validation'} ] ))
也许您想检查其他输出是否为空:
from apache_beam.testing.util import assert_that, equal_to, is_empty, is_not_empty
...
with TestPipeline() as p:
pcoll = (
p
| Beam.Create( testdata )
| Beam.ParDo(I_Want_To_Test_This_DoFn()).with_outputs('valid','invalid')
)
# You can test inside the pipeline with assert_that
assert_that(pcoll.valid, is_empty() ), label='valid')
assert_that(pcoll.invalid, equal_to( [ {'failure step':'Message validation'} ] ), label='invalid')
在这种情况下,您需要向 assert_that() 添加一个标签,以确保它能够生成正确的管道。
推荐阅读
- r - Excel 另存为 CSV:R 中的哪一个
- web-api-testing - 停止授权标头记录
- python - 如何在 python 中使用 Pil 库导入图像?
- node.js - sequelize postgress bulk insert 如果存在则忽略
- cypress - 如何在 cypress 中制作变量 globle,我们可以在规范文件中的所有 TestCase 中使用它
- batch-file - 要求用户输入时语法不正确
- twitter-bootstrap - 使单选按钮组全宽 BootstrapVue
- html - 在 tumblr 网站顶部添加基本横幅
- javascript - 第二个输入 onclick() 函数不起作用
- c# - 使用客户端凭据授予类型自定义 Azure AD 令牌