apache-kafka - 使用 Spring-Kafka 和 Confluent 模式注册表将带有 JSON 模式的记录发送到 Kafka
问题描述
我在互联网上找不到任何信息,如何使用 spring kafka 将带有 json 模式的记录发送到 kafka。我怎样才能做到这一点?
解决方案
花了几个小时后,我发现有 3 种不同的方式可以使用 json 模式发送记录。相关部分在 io.confluent.kafka.schemaregistry.json.JsonSchemaUtils 中实现
这里的摘录:
if (isEnvelope(object)) {
return getSchemaFromEnvelope((JsonNode) object);
}
Class<?> cls = object.getClass();
if (cls.isAnnotationPresent(Schema.class)) {
Schema schema = (Schema) cls.getAnnotation(Schema.class);
List<SchemaReference> references = Arrays.stream(schema.refs())
.map(ref -> new SchemaReference(ref.name(), ref.subject(), ref.version()))
.collect(Collectors.toList());
if (client == null) {
if (!references.isEmpty()) {
throw new IllegalArgumentException("Cannot resolve schema " + schema.value()
+ " with refs " + references);
}
return new JsonSchema(schema.value());
} else {
return (JsonSchema) client.parseSchema(JsonSchema.TYPE, schema.value(), references)
.orElseThrow(() -> new IOException("Invalid schema " + schema.value()
+ " with refs " + references));
}
}
JsonSchemaConfig config = getConfig(useOneofForNullables, failUnknownProperties);
JsonSchemaDraft draft;
switch (specVersion) {
case DRAFT_4:
draft = JsonSchemaDraft.DRAFT_04;
break;
case DRAFT_6:
draft = JsonSchemaDraft.DRAFT_06;
break;
case DRAFT_7:
draft = JsonSchemaDraft.DRAFT_07;
break;
case DRAFT_2019_09:
draft = JsonSchemaDraft.DRAFT_2019_09;
break;
default:
draft = JsonSchemaDraft.DRAFT_07;
break;
}
config = config.withJsonSchemaDraft(draft);
JsonSchemaGenerator jsonSchemaGenerator = new JsonSchemaGenerator(objectMapper, config);
JsonNode jsonSchema = jsonSchemaGenerator.generateJsonSchema(cls);
return new JsonSchema(jsonSchema);
所以你有 3 种可能性:
- 创建一个包含模式和有效负载字段的 JsonNode
- 用 @Schema 注释你的类
- 不提供模式,让它由模式生成器生成
我选择了 1) 使用以下代码:
public class MyKafkaTemplate {
private static final String SCHEMA_POSTFIX_KEY = "-key.json";
private static final String SCHEMA_POSTFIX_VALUE = "-value.json";
private final KafkaTemplate<JsonNode, JsonNode> kafkaTemplate;
private final ObjectMapper objectMapper;
private final Map<String, JsonNode> topicSchemaCache = new ConcurrentHashMap<>();
public <K, V> void send(final String topic, final K key, final V value) {
final JsonNode keyNode = getEnvelope(topic + SCHEMA_POSTFIX_KEY, key);
final JsonNode valueNode = getEnvelope(topic + SCHEMA_POSTFIX_VALUE, value);
kafkaTemplate.send(topic, keyNode, valueNode);
}
private JsonNode getEnvelope(final String schemaFilePath, final Object key) {
final JsonNode schemaNode = getOrLoadSchema(schemaFilePath);
final JsonNode payload = objectMapper.valueToTree(key);
return JsonSchemaUtils.envelope(schemaNode, payload);
}
private JsonNode getOrLoadSchema(final String schemaFilePath) {
return topicSchemaCache.computeIfAbsent(schemaFilePath, key ->
readFileToJsonNode(schemaFilePath));
}
@SneakyThrows
private JsonNode readFileToJsonNode(final String schemaFilePath) {
return objectMapper.readTree(new ClassPathResource(schemaFilePath).getFile());
}
}
推荐阅读
- java - Dagger 中是否有 Guice install(new ModuleA) 的等价物?
- css - 使用组件创建 React 中的 Bootstrap 全屏英雄部分
- apache - 处理 IHS 中的请求标头(ibm http 服务器)
- php - Woocommerce 电子邮件模板中的 Echo ACF 值
- python - 使用 pip install pyserial 后导入序列不起作用
- swift - SwiftUI 选择器不显示选项
- r - 在 R/Shiny 中,如何使用 Action Button 来触发 Observe Event 功能?
- python - 使用 cURL 的 HTTP 请求有效,但在 Python 请求中无效
- google-sheets - 在 Google SPreadsheets (UTF8_decode) 中显示 XLSXWriter 文件特殊字符的问题
- rust - 如何在 Rust 中检测 USB 插入和移除