首页 > 解决方案 > 在 Apache Beam 中使用 PAssert containsInAnyOrder 比较对象

问题描述

在使用 PAssert 为我的光束管道编写单元测试时,管道输出对象正常,但在与以下断言错误比较期间测试失败:

java.lang.AssertionError: Decode pubsub message/ParMultiDo(DecodePubSubMessage).output: 
Expected: iterable with items [<PubsubMessage{message=[123, 34, 104...], attributes={messageId=2be485e4-3e53-4468-a482-a49842b87ed5, dataPipelineId=bc957aa3-17e7-46d6-bc73-0924fa5674fa, region=us-west1, ingestionTimestamp=2020-02-02T12:34:56.789Z}, messageId=null}>] in any order
     but: not matched: <PubsubMessage{message=[123, 34, 104...], attributes={messageId=2be485e4-3e53-4468-a482-a49842b87ed5, dataPipelineId=bc957aa3-17e7-46d6-bc73-0924fa5674fa, region=us-west1, ingestionTimestamp=2020-02-02T12:34:56.789Z}, messageId=null}>

我还尝试将 expectedOutputPubSubMessage 封装在一个列表中(显然原始输出在一个数组中),但无济于事。文档中所有给定的 PAssert 示例都进行了简单的字符串或键值比较。

@RunWith(PowerMockRunner.class)
public class DataDecodePipelineTest implements Serializable {

  @Rule
  public TestPipeline p = TestPipeline.create();

  @Test
  public void testPipeline(){
      PubsubMessage inputPubSubMessage =
              new PubsubMessage(
                      TEST_ENCODED_PAYLOAD.getBytes(),
                      new HashMap<String, String>() {
                          {
                              put(MESSAGE_ID_NAME, TEST_MESSAGE_ID);
                              put(DATA_PIPELINE_ID_NAME, TEST_DATA_PIPELINE_ID);
                              put(INGESTION_TIMESTAMP_NAME, TEST_INGESTION_TIMESTAMP);
                              put(REGION_NAME, TEST_REGION);
                          }
                      });

      PubsubMessage expectedOutputPubSubMessage =
              new PubsubMessage(
                      TEST_DECODED_PAYLOAD.getBytes(),
                      new HashMap<String, String>() {
                          {
                              put(MESSAGE_ID_NAME, TEST_MESSAGE_ID);
                              put(DATA_PIPELINE_ID_NAME, TEST_DATA_PIPELINE_ID);
                              put(INGESTION_TIMESTAMP_NAME, TEST_INGESTION_TIMESTAMP);
                              put(REGION_NAME, TEST_REGION);
                          }
                      });

      PCollection<PubsubMessage> input =
              p.apply(Create.of(Collections.singletonList(inputPubSubMessage)));

      PCollection<PubsubMessage> output =
              input.apply("Decode pubsub message",
                      ParDo.of(new DataDecodePipeline.DecodePubSubMessage()));

      PAssert.that(output).containsInAnyOrder(expectedOutputPubSubMessage);
      
      p.run().waitUntilFinish();
  }
}

显然,几年前有人面临完全相同的问题,但仍未解决。使用 PAssert containsInAnyOrder() 比较对象的测试管道

标签: apache-beam-ioapache-beam

解决方案


问题是您正在比较不同的对象

您的管道的返回是一个 PCollection 并且您正在将它与 PubsubMessage 进行比较

您必须从 expectedOutputPubSubMessage 创建一个 PCollection

尝试这个:

      PAssert.that(output).containsInAnyOrder(Create.of(Collections.singletonList(expectedOutputPubSubMessage));

示例:https ://github.com/GoogleCloudPlatform/DataflowTemplates/blob/master/src/test/java/com/google/cloud/teleport/templates/PubsubToPubsubTest.java


推荐阅读