java - 从值创建 Kafka Stream 以计算数量
问题描述
我正在生成如下数据:
Key: "Mike", value: {"amount":46,"time":"2021-11-05T07:53:32.005751Z"}
Key: "John", value: {"amount":46,"time":"2021-11-05T07:53:32.005751Z"}
Key: "Mike", value: {"amount":50,"time":"2021-11-05T07:53:32.005751Z"}
键是字符串(名字如 Alice、John...)。例如我需要结果:
{"Mike": 2}
{"John": 1}
或者
{"key":"Mike", "count": 2}
{"key":"John", "count": 1}
我接下来尝试了:
public Topology createTopology(){
StreamsBuilder builder = new StreamsBuilder();
// json Serde
final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);
KStream<String, JsonNode> textLines = builder.stream("bank-transactions", Consumed.with(Serdes.String(), jsonSerde));
KTable<String, Long> wordCounts = textLines
.map((k, v) -> new KeyValue<>(k, v.get("amount").asInt()))
.groupByKey(Serialized.with(Serdes.String(), Serdes.Integer()))
.count();
wordCounts.toStream().to("person-transaction-frequency", Produced.with(Serdes.String(), Serdes.Long()));
return builder.build();
}
public static void main(String[] args) {
Properties config = new Properties();
config.put(StreamsConfig.APPLICATION_ID_CONFIG, "bank-favorite-amount-application");
config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:29092");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
config.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
config.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
Mc4CalculateFavoriteAmount wordCountApp = new Mc4CalculateFavoriteAmount();
KafkaStreams streams = new KafkaStreams(wordCountApp.createTopology(), config);
streams.start();
// shutdown hook to correctly close the streams application
Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
}
我正在尝试按名称计算消息。但是我在主题中有工件:
解决方案
如果您只是想计算键,那么您可以丢弃整个值并将其替换1
为每个看到的键。
KStream<String, Bytes> textLines = builder.stream("bank-transactions", Consumed.with(Serdes.String(), Serdes.Bytes()));
KTable<String, Long> wordCounts = textLines
.mapValues(v -> 1L)
.groupByKey(Serialized.with(Serdes.String(), Serdes.Long()))
.count();
wordCounts.toStream().to("person-transaction-frequency", Produced.with(Serdes.String(), Serdes.Long()));
推荐阅读
- python - 如何在不删除标题栏的情况下禁用 Tkinter 窗口的移动
- c++11 - 为什么 nlohmann json 对象的 decltype 不同?
- javascript - 如何使用 jQuery 淡化输入的 ::placeholder?
- token - 错误:内部 JSON-RPC 错误。{ "message": "VM Exception while processing transaction: revert" soldity 0.8.0
- json - 在 Provider 方法中抛出错误 NoSuchMethod:The getter ' ' was called on null
- android - 无法从警报对话框中的 EditText 视图中获取数据
- uiscrollview - UIScrollView 中心内容
- java - 如何解决:发现 15 个 USB 设备未被识别为 Android 设备?
- react-native - 如何从外部函数触发 useEffect?
- reactjs - 在 reactjs 中使用带有 componentDidMount 的 localstorage