首页 > 解决方案 > Spring AOP 与 Spring Kafka 没有按预期工作

问题描述

我有 Spring Boot 1.5.x 项目,我正在尝试使用 AOP 来获取 Kafka Producer 和 Consumer,如下所示,但这些似乎都没有执行。

@Configuration
@AutoConfigureAfter({TraceAutoConfiguration.class})
@ConditionalOnClass({ProducerFactory.class, ConsumerFactory.class})
@ConditionalOnBean(Tracer.class)
public class KafkaTraceAutoConfiguration {
    @Bean
    @ConditionalOnMissingBean
    KafkaTraceAdvice sleuthKafkaAspect(Tracer tracer) {
        return new KafkaTraceAdvice(tracer);
    }
}
@Component
@Aspect
@Slf4j
public class KafkaTraceAdvice {
    public KafkaTraceAdvice(final Tracer tracer) {
        // Wraps Sleuth tracer
        this.kafkaClientTracing = KafkaClientTracing.create(tracer);
    }

    @Pointcut("execution(public * org.springframework.kafka.core.ProducerFactory.createProducer(..))")
    private void producerFactory() {
    }

    @Pointcut("execution(public * org.springframework.kafka.core.ConsumerFactory.createConsumer())")
    private void consumerFactory() {
    }

    @Around("producerFactory()")
    public Object wrapProducerFactory(ProceedingJoinPoint pjp) throws Throwable {
        Producer producer = (Producer) pjp.proceed();
        return this.kafkaClientTracing.producer(producer);
    }

    @Around("consumerFactory()")
    public Object wrapConsumerFactory(ProceedingJoinPoint pjp) throws Throwable {
        Consumer consumer = (Consumer) pjp.proceed();
        return this.kafkaClientTracing.consumer(consumer);
    }
}
@Slf4j
public final class KafkaClientTracing {
    public <K, V> Consumer<K, V> consumer(Consumer<K, V> consumer) {
        if (consumer == null) {
            throw new NullPointerException("consumer == null");
        }
        // Do something with consumer
    }

    public <K, V> Producer<K, V> producer(Producer<K, V> producer) {
        if (producer == null) {
            throw new NullPointerException("producer == null");
        }
        // Do something with producer
    }
}

我可以看到正在生产和消费的事件,但没有达到这些方面。知道是什么原因造成的吗?

Essential 我试图通过 Spring Boot 1.5.x实现与TraceMessagingAutoConfiguration.java相同的功能。有什么明显的我错过了吗?

标签: spring-bootspring-kafkaspring-aop

解决方案


首先感谢大家对此的帮助。

设法在 Spring Boot 1.5.x 上解决了以下问题,

public class KafkaTraceAutoConfiguration {

    @Bean
    @ConditionalOnMissingBean
    public KafkaTraceInterceptor sleuthKafkaAspect(Tracer tracer) {
        KafkaTraceInterceptor kafkaTraceInterceptor = Aspects.aspectOf(KafkaTraceInterceptor.class);
        kafkaTraceInterceptor.setKafkaTracing(KafkaClientTracing.create(tracer));
        return kafkaTraceInterceptor;
    }
}

和拦截器

@Component
@Aspect
public class KafkaTraceInterceptor {

    private KafkaClientTracing kafkaTracing;

    public void setKafkaTracing(KafkaClientTracing kafkaTracing) {
        this.kafkaTracing = kafkaTracing;
    }

    @Pointcut("execution(public * org.springframework.kafka.core.ProducerFactory.createProducer(..))")
    public void anyProducerFactory() {
    }

    @Pointcut("execution(public * org.springframework.kafka.core.ConsumerFactory.createConsumer(..))")
    public void anyConsumerFactory() {
    }

    @Around("anyProducerFactory()")
    public Object wrapProducerFactory(ProceedingJoinPoint pjp) throws Throwable {
        Producer producer = (Producer) pjp.proceed();
        if (this.kafkaTracing != null) {
            return this.kafkaTracing.producer(producer);
        }
        return producer;
    }

    @Around("anyConsumerFactory()")
    public Object wrapConsumerFactory(ProceedingJoinPoint pjp) throws Throwable {
        Consumer consumer = (Consumer) pjp.proceed();
        if (this.kafkaTracing != null) {
            return this.kafkaTracing.consumer(consumer);
        }
        return consumer;
    }
}

SleuthTracer包装纸,

@Component
public final class KafkaClientTracing {

    public static KafkaClientTracing create(Tracer tracer) {
        return new KafkaClientTracing.Builder(tracer).build();
    }

    public static final class Builder {

        final Tracer tracer;

        Builder(Tracer tracer) {
            if (tracer == null) {
                throw new NullPointerException("tracer == null");
            }
            this.tracer = tracer;
        }

        public KafkaClientTracing build() {
            return new KafkaClientTracing(this);
        }
    }

    private final Tracer tracer;

    public Tracer getTracer() {
        return tracer;
    }

    public KafkaClientTracing(KafkaClientTracing.Builder builder) {
        this.tracer = builder.tracer;
    }

    public <K, V> Consumer<K, V> consumer(Consumer<K, V> consumer) {
        if (consumer == null) {
            throw new NullPointerException("consumer == null");
        }
        return new TracingConsumer<>(consumer, this);
    }

    public <K, V> Producer<K, V> producer(Producer<K, V> producer) {
        if (producer == null) {
            throw new NullPointerException("producer == null");
        }
        return new TracingProducer<>(producer, this);
    }
}

然后我执行-javaagent:aspectjweaver.jar


推荐阅读