spring - Kafka Listener 中的钩子
问题描述
在 kafka 收听消息之前/之后是否有任何可用的钩子?
用例:必须设置 MDC 关联 ID 以执行日志可追溯性
我在找什么?一个 before/after 回调方法,以便可以在进入时设置 MDC 关联 ID,并最终在退出时清除 MDC。
编辑的场景: 我正在获取作为 Kafka Headers 的一部分的共同关系 ID,并且我想在 Kafka Listener 中收到消息后立即在 MDC 中设置相同的 ID
感谢帮助
解决方案
您可以向您的侦听器 bean 添加环绕建议...
@SpringBootApplication
public class So59854374Application {
public static void main(String[] args) {
SpringApplication.run(So59854374Application.class, args);
}
@Bean
public static BeanPostProcessor bpp() { // static is important
return new BeanPostProcessor() {
@Override
public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {
if (bean instanceof MyListener) {
ProxyFactoryBean pfb = new ProxyFactoryBean();
pfb.setTarget(bean);
pfb.addAdvice(new MethodInterceptor() {
@Override
public Object invoke(MethodInvocation invocation) throws Throwable {
try {
System.out.println("Before");
return invocation.proceed();
}
finally {
System.out.println("After");
}
}
});
return pfb.getObject();
}
return bean;
}
};
}
@Bean
public NewTopic topic() {
return TopicBuilder.name("so59854374").partitions(1).replicas(1).build();
}
@Bean
public ApplicationRunner runner(KafkaTemplate<String, String> template) {
return args -> template.send("so59854374", "foo");
}
}
@Component
class MyListener {
@KafkaListener(id = "so59854374", topics = "so59854374")
public void listen(String in) {
System.out.println(in);
}
}
和
Before
foo
After
编辑
如果您将@Header("myMdcHeader") byte[] mdc
作为附加参数添加到您的 kafka 侦听器方法,则可以getArguments()[1]
在调用中使用。
另一种解决方案是向RecordInterceptor
侦听器容器工厂添加一个,它允许您在将原始ConsumerRecord
数据传递给侦听器适配器之前访问它。
/**
* An interceptor for {@link ConsumerRecord} invoked by the listener
* container before invoking the listener.
*
* @param <K> the key type.
* @param <V> the value type.
*
* @author Gary Russell
* @since 2.2.7
*
*/
@FunctionalInterface
public interface RecordInterceptor<K, V> {
/**
* Perform some action on the record or return a different one.
* If null is returned the record will be skipped.
* @param record the record.
* @return the record or null.
*/
@Nullable
ConsumerRecord<K, V> intercept(ConsumerRecord<K, V> record);
}
/**
* Set an interceptor to be called before calling the listener.
* Does not apply to batch listeners.
* @param recordInterceptor the interceptor.
* @since 2.2.7
*/
public void setRecordInterceptor(RecordInterceptor<K, V> recordInterceptor) {
this.recordInterceptor = recordInterceptor;
}
如果您使用的是批处理侦听器,Kafka 提供了一个ConsumerInterceptor
.
推荐阅读
- push-notification - 收到远程通知时如何将导航推送到新的小部件?
- javascript - Recharts.js ResponsiveContainer 显示类似填充的空白
- salesforce - 我们是否需要编写一个可批处理的课程来安排课程
- powershell - 如何根据 Powershell 中的项目计数过滤子目录
- bootstrap-4 - 如何修复这种复杂的布局?
- postgresql - 解释分析扫描的实际行与表中的总行之间的差异
- android - 找不到 androidx.lifecycle.DefaultLifecycleObserver
- extjs - 如何在 extjs 中的另一个组件中使用一个组件
- python - 试图用python从pdf中提取某一行文本
- vuejs2 - 循环时需要访问 Bootstrap Vue 表中的项目索引