首页 > 解决方案 > Apache Beam Python 单元测试 ParDo 类 with_outputs

问题描述

我有一个 ParDo 类,它检查 pubsub 消息是否包含某些属性并返回一个有效和无效的 TaggedOutput (在正常流程中工作正常,使用 yield 来返回值),我无法对这个类进行单元测试,我正在尝试提供一个虚拟消息(字典复制 pubsub 消息信息),并且我想检查该类的输出是否包含其他属性。

这就是我到目前为止所拥有的:

class TestValidateMessage(unittest.TestCase):

def test_not_valid(self):
    with TestPipeline() as p:
        pcoll = (
                p
                | beam.Create([{'attributes':{"imageUrl":000}}])
                | beam.ParDo(ValidateMessage()).with_outputs(
        'invalid', main='valid'))
    valid, _ = pcoll
    invalid = pcoll['invalid']
    print(invalid)
    assert_that(invalid, {'failure_step':'Message validation'})

有了这个,我收到一条错误消息:

TypeError: Map can be used only with callable objects. Received {'failure_step'} instead.

当我尝试打印(无效)时,我得到PCollection[ParDo(ValidateMessage)/ParDo(ValidateMessage).invalid] 如何访问(出于断言目的)PCollection 的内容?

标签: pythonunit-testingapache-beamgoogle-cloud-pubsub

解决方案


TLDR;

这段代码存在三个问题:

  • assert_that()在您运行管道后执行。
  • 您必须使用 equal_to()、is_empty() 或 is_not_empty() 作为 assert_that() 中的条件
  • 在管道结束时,您将获得一个数组。始终针对 [ expected_result ] 进行测试。

更正的代码:

class TestValidateMessage(unittest.TestCase):

def test_not_valid(self):
    with TestPipeline() as p:
        pcoll = (
                p
                | beam.Create([{'attributes':{"imageUrl":000}}])
                | beam.ParDo(ValidateMessage()).with_outputs(
        'invalid', main='valid'))

        assert_that(pcoll.invalid, 
                    equal_to([ {'failure_step':'Message validation'} ] )

长解释

在 Apache Beam 中,您必须测试管道内的代码。

Beam 为您从 PCollection 构建管道进行断言。看起来很复杂,但它看起来更简单。

如果您有这样的管道:

with TestPipeline() as p:
   pcoll = p | Beam.Create( testdata ) | Beam.DoFn(I_Want_To_Test_This())

您必须在 with 子句中添加 assert_that 因为 assert_that() 将添加代码以在管道内执行断言。

with TestPipeline() as p:
   pcoll = p | Beam.Create( testdata ) | Beam.ParDo(I_Want_To_Test_This_DoFn())
   assert_that(pcoll, equal_to(expected_data) )

这与执行此操作相同:

P = TestPipeline()
pcoll = p | Beam.Create( testdata ) | Beam.ParDo(I_Want_To_Test_This_DoFn())
assert_that(pcoll, equal_to(expected_data) )
p.run()  # Test must be run _inside_ the pipeline

当您有多个输出时,它是相似的:

with TestPipeline() as p:
   pcoll = (
       p 
       | Beam.Create( testdata ) 
       | Beam.ParDo(I_Want_To_Test_This_DoFn()).with_outputs('valid','invalid')
   )

   # You can test inside the pipeline with assert_that

   assert_that(pcoll.invalid, equal_to( [ {'failure step':'Message validation'} ] ))

也许您想检查其他输出是否为空:

from apache_beam.testing.util import assert_that, equal_to, is_empty, is_not_empty

...

with TestPipeline() as p:
   pcoll = (
       p 
       | Beam.Create( testdata ) 
       | Beam.ParDo(I_Want_To_Test_This_DoFn()).with_outputs('valid','invalid')
   )

   # You can test inside the pipeline with assert_that

   assert_that(pcoll.valid, is_empty() ), label='valid')

   assert_that(pcoll.invalid, equal_to( [ {'failure step':'Message validation'} ] ), label='invalid')

在这种情况下,您需要向 assert_that() 添加一个标签,以确保它能够生成正确的管道。


推荐阅读