首页 > 解决方案 > kafka zipkin拦截器的使用示例

问题描述

因此,我们在内部使用 kafka 队列进行一些微服务的通信,也使用 zipkin 进行分布式跟踪。您能否建议如何在 zipkin 服务器中引入 kafka 跟踪以进行调试。

我遇到了brave-kafka-interceptor,但无法从提供的最小示例中使用 kafka 理解它。周围是否还有其他示例,或者使用了完全不同的库。

标签: spring-bootapache-kafkainterceptorzipkindistributed-tracing

解决方案


将https://github.com/openzipkin-contrib/brave-kafka-interceptor#configuration中提到的配置添加到生产者和消费者中以配置跟踪。

一旦我们有了痕迹,我们需要将它们刷新到 zipkin UI。我们需要在 AsyncZipkinSpanHandler 对象上调用 flush 方法来将跟踪刷新到 zipkin。但是使用勇敢的 kafka 拦截器,我们无法访问该对象。

因此,我们需要在应用程序中提供一些空闲时间来刷新跟踪。基本上,如果我们的程序中有任何空闲时间,那么即使没有显式调用 flush() 方法,zipkin 也会帮助刷新跟踪。(我不确定这是否正确(关于空闲时间的刷新痕迹)。这完全来自我的观察。)

ProducerTracing.java

import brave.kafka.interceptor.TracingProducerInterceptor;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class ProducerTracing {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        properties.setProperty(ProducerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingProducerInterceptor.class.getName());
        properties.setProperty("zipkin.http.endpoint", "http://127.0.0.1:9411/api/v2/spans");
        properties.setProperty("zipkin.sender.type", "HTTP");
        properties.setProperty("zipkin.encoding", "JSON");
        properties.setProperty("zipkin.remote.service.name", "kafka");
        properties.setProperty("zipkin.local.service.name", "producer");
        properties.setProperty("zipkin.trace.id.128bit.enabled", "true");
        properties.setProperty("zipkin.sampler.rate", "1.0F");

        KafkaProducer<String, String> producer = new KafkaProducer<>(properties);
        ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
        producer.send(record);

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

而在消费者端,我们不需要调用 sleep 方法来创建空闲时间。当消费者调用 poll() 方法时,我们会得到一些空闲时间,因为 poll() 方法会创建另一个线程并从 kafka 代理获取记录。因此,消费者可以将痕迹刷新到 zipkin。

消费者追踪.java

import brave.kafka.interceptor.TracingConsumerInterceptor;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerTracing {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "group");

        properties.setProperty(ConsumerConfig.INTERCEPTOR_CLASSES_CONFIG, TracingConsumerInterceptor.class.getName());
        properties.setProperty("zipkin.http.endpoint", "http://127.0.0.1:9411/api/v2/spans");
        properties.setProperty("zipkin.sender.type", "HTTP");
        properties.setProperty("zipkin.encoding", "JSON");
        properties.setProperty("zipkin.remote.service.name", "kafka");
        properties.setProperty("zipkin.local.service.name", "consumer");
        properties.setProperty("zipkin.trace.id.128bit.enabled", "true");
        properties.setProperty("zipkin.sampler.rate", "1.0F");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);

        consumer.subscribe(Collections.singleton("topic"));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(10));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.key() + " " + record.value());
            }
        }
    }
}

现在,我们可以在 Zipkin 中观察痕迹。 观察到的痕迹


推荐阅读