spring - 将 KTable 或 Kstream 与 Spring Cloud Stream 一起使用并使用 Avro
问题描述
我想对我的 kafka 流主题进行交互式查询。
目前我可以将 avro 序列化的 json 对象发送到我的主题,并使用 avro 反序列化器再次读取它们。我在这种情况下使用普通的 MessageChannel Binder,这可以按预期工作。
现在我想使用 kafka 流绑定器,但我无法让它工作。也许有人可以帮助我。
我的配置:
spring:
cloud:
bus:
enabled: true
stream:
schemaRegistryClient.endpoint: http://192.168.99.100:8081
bindings:
segments-in:
destination: segments
contentType: application/vnd.segments-value.v1+avro
segments-all:
destination: segments
group: segments-all
consumer:
headerMode: raw
useNativeDecoding: true
kafka:
binder:
zkNodes: 192.168.99.100:2181
brokers: 192.168.99.100:32768
streams:
bindings:
segments-all:
consumer:
keySerde: org.apache.kafka.common.serialization.Serdes$StringSerde
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
卡夫卡配置类:
@Configuration
public class KafkaConfiguration {
@Bean
public MessageConverter classificationMessageConverter() {
AvroSchemaMessageConverter converter = new AvroSchemaMessageConverter();
converter.setSchema(Segment.SCHEMA$);
return converter;
}
}
架构配置
@Configuration
public class SchemaRegistryConfiguration {
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") final String endpoint) {
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
}
现在我的界面
public interface Channels {
String EVENTS = "segments-in";
String ALLSEGMENTS = "segments-all";
@Input(Channels.EVENTS)
SubscribableChannel events();
@Input(Channels.ALLSEGMENTS)
KTable<?, ?> segmentsIn();
}
我总是收到以下错误(警告消息),但只有当我打开第二个称为segmentsIn()的通道时。
org.apache.kafka.clients.NetworkClient : [AdminClient clientId=adminclient-3] Connection to node -1 could not be established. Broker may not be available.
使用 SubscribableChannel (segments-in) 一切正常,我在这里做错了什么?我怎样才能让频道段全部与 kafka 流 api 一起使用?
解决方案
我得到了使用以下配置的连接:
spring:
cloud:
bus:
enabled: true
stream:
schemaRegistryClient.endpoint: http://192.168.99.100:8081
bindings:
segments-in:
destination: segments
contentType: application/vnd.segments-value.v1+avro
segments-all:
destination: segments
group: segments-all
consumer:
useNativeDecoding: false
events-out:
destination: incidents
group: events-out
producer:
useNativeDecoding: false
kafka:
binder:
zkNodes: 192.168.99.100:2181
brokers: 192.168.99.100:32768
streams:
binder:
zkNodes: 192.168.99.100:2181
brokers: 192.168.99.100:32768
configuration:
schema.registry.url: http://192.168.99.100:8081
default.key.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
default.value.serde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
请参阅为 kafka 流添加的配置,但我无法使用我的代码查询任何内容。
我使用以下代码段:
@StreamListener(Channels.ALLSEGMENTS)
@SendTo(Channels.EVENTS_OUT)
public KStream<Utf8, Long> process(KStream<String, Segment> input) {
log.info("Read new information");
return input
.filter((key, segment) -> segment.getStart().time > 10)
.map((key, value) -> new KeyValue<>(value.id, value))
.groupByKey()
.count(Materialized.as(STORE_NAME))
.toStream();
}
这个调度程序:
@Scheduled(fixedRate = 30000, initialDelay = 5000)
public void printProductCounts() {
if (keyValueStore == null) {
keyValueStore = queryService.getQueryableStoreType(STORE_NAME, QueryableStoreTypes.keyValueStore());
}
String id = "21523XDEf";
System.out.println(keyValueStore.approximateNumEntries());
System.out.println("Product ID: " + id + " Count: " + keyValueStore.get(id));
}
输出总是:
0
Product ID: 21523XDEf Count: null
有人可以指出我正确的方向吗?我究竟做错了什么?
推荐阅读
- variables - 从 watson 中提取变量并存储为单独的文件
- java - Neo4j - Infinispan 组合
- apl - APL - 导入 dll
- c# - sql语法错误在哪里?
- postgresql - 如何在hstore中重现订单的偶然性?
- sql - 创建嵌套表列存储表时出错
- laravel - Laravel 5.4 (1/1) FatalErrorException - 具有类类型提示的参数的默认值在 CssSelectot\rConverter.php 第 34 行中只能为 NULL
- c# - PostMessage 发送的来自 c# 中的 c++ 的免费 HBITMAP
- spring - Spring @Scheduled fixedDelay 未按预期工作
- parsing - 为什么 time.Parse 不使用时区?