首页 > 解决方案 > 将 KTable 或 Kstream 与 Spring Cloud Stream 一起使用并使用 Avro

问题描述

我想对我的 kafka 流主题进行交互式查询。

目前我可以将 avro 序列化的 json 对象发送到我的主题,并使用 avro 反序列化器再次读取它们。我在这种情况下使用普通的 MessageChannel Binder,这可以按预期工作。

现在我想使用 kafka 流绑定器,但我无法让它工作。也许有人可以帮助我。

我的配置:

    spring:
      cloud:
        bus:
          enabled: true
        stream:
          schemaRegistryClient.endpoint: http://192.168.99.100:8081
          bindings:
            segments-in:
              destination: segments
              contentType: application/vnd.segments-value.v1+avro
            segments-all:
              destination: segments
              group: segments-all
              consumer:
                headerMode: raw
                useNativeDecoding: true
          kafka:
            binder:
              zkNodes: 192.168.99.100:2181
              brokers: 192.168.99.100:32768
            streams:
              bindings:
                segments-all:
                  consumer:
                    keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
                    valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

卡夫卡配置类:

@Configuration
public class KafkaConfiguration {
    @Bean
    public MessageConverter classificationMessageConverter() {
        AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter();
        converter.setSchema(Segment.SCHEMA$);
        return converter;
    }
}

架构配置

@Configuration
public class SchemaRegistryConfiguration {
    @Bean
    public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") final String endpoint) {
        ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
        client.setEndpoint(endpoint);
        return client;
    }
}

现在我的界面

public interface Channels {
    String EVENTS = "segments-in";
    String ALLSEGMENTS = "segments-all";

    @Input(Channels.EVENTS)
    SubscribableChannel events();

    @Input(Channels.ALLSEGMENTS)
    KTable<?, ?> segmentsIn();

}

我总是收到以下错误(警告消息),但只有当我打开第二个称为segmentsIn()的通道时。

org.apache.kafka.clients.NetworkClient   : [AdminClient clientId=adminclient-3] Connection to node -1 could not be established. Broker may not be available.

使用 SubscribableChannel (segments-in) 一切正常,我在这里做错了什么?我怎样才能让频道段全部与 kafka 流 api 一起使用?

标签: springapache-kafkaavrospring-cloud-streamspring-kafka

解决方案


我得到了使用以下配置的连接:

spring:
  cloud:
    bus:
      enabled: true
    stream:
      schemaRegistryClient.endpoint: http://192.168.99.100:8081
      bindings:
        segments-in:
          destination: segments
          contentType: application/vnd.segments-value.v1+avro
        segments-all:
          destination: segments
          group: segments-all
          consumer:
            useNativeDecoding: false
        events-out:
          destination: incidents
          group: events-out
          producer:
            useNativeDecoding: false    
      kafka:
        binder:
          zkNodes: 192.168.99.100:2181
          brokers: 192.168.99.100:32768
        streams:
          binder:
            zkNodes: 192.168.99.100:2181
            brokers: 192.168.99.100:32768
            configuration:
              schema.registry.url: http://192.168.99.100:8081
              default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
              default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde

请参阅为 kafka 流添加的配置,但我无法使用我的代码查询任何内容。

我使用以下代码段:

@StreamListener(Channels.ALLSEGMENTS)
@SendTo(Channels.EVENTS_OUT) 
public KStream<Utf8, Long> process(KStream<String, Segment> input) {
    log.info("Read new information");
    return input
            .filter((key, segment) -> segment.getStart().time > 10)
            .map((key, value) -> new KeyValue<>(value.id, value))
            .groupByKey()
            .count(Materialized.as(STORE_NAME))
            .toStream();
}

这个调度程序:

    @Scheduled(fixedRate = 30000, initialDelay = 5000)
    public void printProductCounts() {
     if (keyValueStore == null) {
            keyValueStore = queryService.getQueryableStoreType(STORE_NAME,                    QueryableStoreTypes.keyValueStore());
        }
        String id = "21523XDEf";
        System.out.println(keyValueStore.approximateNumEntries());
        System.out.println("Product ID: " + id + " Count: " + keyValueStore.get(id));
}

输出总是:

0   
 Product ID: 21523XDEf Count: null

有人可以指出我正确的方向吗?我究竟做错了什么?


推荐阅读