首页 > 解决方案 > 如何使用 mockito 使用回调测试来自 Kafka Producer 的发送方法?

问题描述

我正在尝试测试一种方法,为此我正在使用 mockito。然而,我的模拟并没有进入 if 的条件,而是直接进入了同花顺的行。

我想测试以下情况:

public class SomeClass{
     @Autowired
     private Producer<String, SpecificRecord> producer;

     private static final Logger LOGGER = Logger.Factory.getLogger(SomeClass.class);

     public void sendMessage(String topic, SpecificRecord message, Map<String,String> headers){
         ProducerRecord<String, SpecificRecord> avroMessage = new ProducerRecord<>(topic, null, null, "key",message, headers);
         producer.send(avroMessage, (metadata, exception) -> {
             if(exception == null ) {
                 LOGGER.info("OK");
             }else{ 
                 LOGGER.info("NOK");
             }
         });
         producer.flush();
    }

我正在做的是:

@RunWith(MockitoJUnitRunner.class)
public class SomeClassTest{
    @InjectMocks
    private SomeClass someclass;
    
    @Mock
    private Producer<String,SpecificRecord> producer;

    @Mock
    private SpecificRecord message;

    @Mock
    private Logger logger;

    @Test
    public void sendMessageTest(){
        when(producer.send(any(ProducerRecord.class),any(Callback.class))).thenReturn(null);
        doNothing().when(producer).flush();
        
        someclass.sendMessage("topic", message, new HashMap<String,String>());
        verify(logger).info("OK");
    }
}
    

我该怎么做才能正确模拟这种情况?

标签: javaapache-kafkamockito

解决方案


通过使用下面的 lambda 表达式,您正在实现Callback接口 ( Interface Callback )的onCompletion方法

(metadata, exception) -> {
         if(exception == null ) {
             LOGGER.info("OK");
         }else{ 
             LOGGER.info("NOK");
         }
     }

而且由于您正在模拟 KafkaProducer send 方法调用,因此不会调用该回调。因此,为了能够进入回调调用,您必须捕获 Callback 参数并调用onCompletion方法,如下所示:

    ArgumentCaptor<Callback> callBackCaptor = ArgumentCaptor.forClass(Callback.class);
    Mockito.verify(producer).send(any(ProducerRecord.class), callBackCaptor.capture());
    Callback kafkaCallback = callBackCaptor.getValue();
    kafkaCallback.onCompletion(new RecordMetadata(null, 0, 0, 0, 0, 0, 0), null);

推荐阅读