java - JSON对象的Kafka流消费者:如何映射
问题描述
我是 Kafka/Kafka Stream 的新手。我正在使用最新的 Kafka/kafka-stream 和 kafka-client 和openjdk11。我的制作人正在生成看起来像的json 对象(其中键是名称)
{"Name":"John", "amount":123, "time":2019-10-03T05:24:52" }
生产者代码以便更好地理解:
public static ProducerRecord<String, String> newRandomTransaction(String name) {
// creates an empty json {}
ObjectNode transaction = JsonNodeFactory.instance.objectNode();
Integer amount = ThreadLocalRandom.current().nextInt(0, 100);
// Instant.now() is to get the current time
Instant now = Instant.now();
// we write the data to the json document
transaction.put("name", name);
transaction.put("amount", amount);
transaction.put("time", now.toString());
return new ProducerRecord<>("bank-transactions", name, transaction.toString());
}
现在我正在尝试编写我的应用程序来消耗交易并计算该人余额中的总金额。
(仅供参考:我正在使用旧代码并试图使其工作)。
使用GroupBYKey作为主题已经有正确的键。然后汇总以计算我正在努力的总余额。
此时的应用程序(注释掉的部分是我试图使其在下一行中工作的旧代码):
public class BankBalanceExactlyOnceApp {
private static ObjectMapper mapper = new ObjectMapper();
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "bank-balance-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
// we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");
// Exactly once processing!!
config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);
// json Serde
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
StreamsBuilder builder = new StreamsBuilder();
KStream<String, JsonNode> bankTransactions =
builder.stream( "bank-transactions", Materialized.with(Serdes.String(), jsonSerde);
// create the initial json object for balances
ObjectNode initialBalance = JsonNodeFactory.instance.objectNode();
initialBalance.put("count", 0);
initialBalance.put("balance", 0);
initialBalance.put("time", Instant.ofEpochMilli(0L).toString());
/*KTable<String, JsonNode> bankBalance = bankTransactions
.groupByKey(Serdes.String(), jsonSerde)
.aggregate(
() -> initialBalance,
(key, transaction, balance) -> newBalance(transaction, balance),
jsonSerde,
"bank-balance-agg"
);*/
KTable<String, JsonNode> bankBalance = bankTransactions
.groupByKey(Serialized.with(Serdes.String(), jsonSerde))
.aggregate(
() -> initialBalance,
(key, transaction, balance) -> {
//String t = transaction.toString();
newBalance(transaction, balance);
},
Materialized.with(Serdes.String(), jsonSerde),
"bank-balance-agg"
);
bankBalance.toStream().to("bank-balance-exactly-once", Produced.with(Serdes.String(), jsonSerde));
KafkaStreams streams = new KafkaStreams(builder.build(), config);
streams.cleanUp();
streams.start();
// print the topology
System.out.println(streams.toString());
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
private static JsonNode newBalance(JsonNode transaction, JsonNode balance) {
// create a new balance json object
ObjectNode newBalance = JsonNodeFactory.instance.objectNode();
newBalance.put("count", balance.get("count").asInt() + 1);
newBalance.put("balance", balance.get("balance").asInt() + transaction.get("amount").asInt());
Long balanceEpoch = Instant.parse(balance.get("time").asText()).toEpochMilli();
Long transactionEpoch = Instant.parse(transaction.get("time").asText()).toEpochMilli();
Instant newBalanceInstant = Instant.ofEpochMilli(Math.max(balanceEpoch, transactionEpoch));
newBalance.put("time", newBalanceInstant.toString());
return newBalance;
}
}
问题是:当我试图在行中调用 newBalance(transaction, balance) 时:
aggregate(
() -> initialBalance,
(key, transaction, balance) -> newBalance(transaction, balance),
jsonSerde,
"bank-balance-agg"
)
并使用 msg查看编译器错误:
newBalance(JsonNode, JsonNode) can not be applied to (<lambda parameter>,<lambda parameter>)
我尝试将其读取为字符串,将参数类型从 JsonNode 更改为 Object。但是,无法修复它。
我可以就如何修复它获得任何建议吗?
解决方案
KGroupedStream
在 Kafka Streams 2.3 中没有具有以下签名的方法:
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
String aggregateName);
有两种重载方法aggregate
:
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator);
<VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
final Aggregator<? super K, ? super V, VR> aggregator,
final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
您应该使用第二个,您的代码应该类似于:
KTable<String, JsonNode> bankBalance = input
.groupByKey(Grouped.with(Serdes.String(), jsonSerde))
.aggregate(
() -> initialBalance,
(key, transaction, balance) -> newBalance(transaction, balance),
Materialized.with(Serdes.String(), jsonSerde)
);
推荐阅读
- image - 从 svg 转换为 pdf 保留图像元数据
- java - AWS Java SDK 2.0 S3 预签名 URL 公共对象访问
- r - 在mac上编译R markdown时块输出中的编码问题
- r - 安装包错误,非零退出状态,找不到包
- javascript - 如何在javascript中验证空字符串
- javascript - 如何调试通过 HtmlService 提供的 Javascript?
- javascript - 如何实现搜索功能?
- sql - 在 ms 中存储 Select Query 输出访问现有表
- ansible - ansible zypper 安装卡在 SUSE 12 SP5 上
- xml - 包含 text() 和子节点的 node() 必须转换为大写文本