spring-cloud-stream - Spring Cloud Stream融合模式注册表不起作用
问题描述
下面的代码不适用于融合模式注册表。它没有给出模式注册表 url 的连接超时错误。
消费者配置
@SpringBootApplication
@EnableBinding(Sink.class)
@EnableSchemaRegistryClient
public class ConsumerApplication {
private final Log logger = LogFactory.getLog(getClass());
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class, args);
}
@StreamListener(Sink.INPUT)
public void process(Sensor data) {
logger.info(data);
}
@Configuration
static class ConfluentSchemaRegistryConfiguration {
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint("http://localhost:8081");
return client;
}
}
}
配置看起来像这样
spring:
cloud:
stream:
bindings:
input:
destination: sensor-topic
schemaRegistryClient:
endpoint: http://localhost:8081
schema:
avro:
schema-locations: classpath:avro/sensor.avsc
server.port: 9999
生产者配置
下面是放置消息的生产者配置,它无法连接到模式注册表
spring:
cloud:
stream:
bindings:
output:
contentType: application/*+avro
destination: sensor-topic
schemaRegistryClient:
endpoint: http://localhost:8081
schema:
avro:
schema-locations: classpath:avro/sensor.avsc
server.port: 9009
下面是生产者配置
@SpringBootApplication
@EnableBinding(Source.class)
@EnableSchemaRegistryClient
@RestController
public class Producer1Application {
@Autowired
private Source source;
private Random random = new Random();
public static void main(String[] args) {
SpringApplication.run(Producer1Application.class, args);
}
@RequestMapping(value = "/messages", method = RequestMethod.POST)
public String sendMessage() {
source.output().send(MessageBuilder.withPayload(randomSensor()).build());
return "ok, have fun with v1 payload!";
}
private Sensor randomSensor() {
Sensor sensor = new Sensor();
sensor.setId(UUID.randomUUID().toString() + "-v1");
sensor.setAcceleration(random.nextFloat() * 10);
sensor.setVelocity(random.nextFloat() * 100);
sensor.setTemperature(random.nextFloat() * 50);
return sensor;
}
//Another convenience POST method for testing deterministic values
@RequestMapping(value = "/messagesX", method = RequestMethod.POST)
public String sendMessageX(@RequestParam(value="id") String id, @RequestParam(value="acceleration") float acceleartion,
@RequestParam(value="velocity") float velocity, @RequestParam(value="temperature") float temperature) {
Sensor sensor = new Sensor();
sensor.setId(id + "-v1");
sensor.setAcceleration(acceleartion);
sensor.setVelocity(velocity);
sensor.setTemperature(temperature);
source.output().send(MessageBuilder.withPayload(sensor).build());
return "ok, have fun with v1 payload!";
}
@Configuration
static class ConfluentSchemaRegistryConfiguration {
@Bean
public SchemaRegistryClient schemaRegistryClient(@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}") String endpoint){
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
}
}
解决方案
推荐阅读
- d3.js - 任何 js 库中带有 4 个标签(对面图)的散点图?
- html - 带有搜索栏的 EJS 表单 GET 请求未按预期工作
- r - 仅产生一组的 ntile 函数
- javascript - 当用户单击 Mozilla Firefox 的后退按钮时如何避免多个更改框
- google-apps-script - 如何将数组发送到 Google 表格范围?
- java - 删除 jar 文件时优雅地关闭 jvm
- azure - 复制 Azure Application Insights 仪表板并指向新的 Application Insights 实例
- rspec - 测试是否在控制器中调用了来自另一个范围的方法
- python-3.x - pandas:重新采样和计算正确的平均值
- angular - 在打字稿中比较字符串时出现问题