apache-kafka - Kafka-Verifiable-Producer and Consumer 问题
问题描述
我正在尝试使用卡夫卡。我已经开始
kafka-console-producer and
kafka-console-consumer.
我使用 kafka-producer 发送消息并在 kafka-console-consumer 成功接收。现在我想一次产生和消费大约 5000 条消息。我查看文档并了解有两个命令。
kafka-verifiable-producer.sh
kafka-verifiable-consumer.sh
我尝试使用这些命令。
kafka-verifiable-producer.sh --broker-list localhost:9092 --max-messages 5000 --topic data-sending
kafka-verifiable-consumer.sh --group-instance-id 1 --group-id data-world --topic data-sending --broker-list localhost:9092
结果如下
"timestamp":1581268289761,"name":"producer_send_success","key":null,"value":"4996","offset":44630,"topic":"try_1","partition":0}
{"timestamp":1581268289761,"name":"producer_send_success","key":null,"value":"4997","offset":44631,"topic":"try_1","partition":0}
{"timestamp":1581268289761,"name":"producer_send_success","key":null,"value":"4998","offset":44632,"topic":"try_1","partition":0}
{"timestamp":1581268289761,"name":"producer_send_success","key":null,"value":"4999","offset":44633,"topic":"try_1","partition":0}
{"timestamp":1581268289769,"name":"shutdown_complete"}
{"timestamp":1581268289771,"name":"tool_data","sent":5000,"acked":5000,"target_throughput":-1,"avg_throughput":5285.412262156448}
在消费者控制台上,结果如下
{"timestamp":1581268089357,"name":"records_consumed","count":352,"partitions":[{"topic":"try_1","partition":0,"count":352,"minOffset":32777,"maxOffset":33128}]}
{"timestamp":1581268089359,"name":"offsets_committed","offsets":[{"topic":"try_1","partition":0,"offset":33129}],"success":true}
{"timestamp":1581268089384,"name":"records_consumed","count":500,"partitions":[{"topic":"try_1","partition":0,"count":500,"minOffset":33129,"maxOffset":33628}]}
{"timestamp":1581268089391,"name":"offsets_committed","offsets":[{"topic":"try_1","partition":0,"offset":33629}],"success":true}
{"timestamp":1581268089392,"name":"records_consumed","count":270,"partitions":[{"topic":"try_1","partition":0,"count":270,"minOffset":33629,"maxOffset":33898}]}
{"timestamp":1581268089394,"name":"offsets_committed","offsets":[{"topic":"try_1","partition":0,"offset":33899}],"success":true}
{"timestamp":1581268089415,"name":"records_consumed","count":500,"partitions":[{"topic":"try_1","partition":0,"count":500,"minOffset":33899,"maxOffset":34398}]}
{"timestamp":1581268089416,"name":"offsets_committed","offsets":[{"topic":"try_1","partition":0,"offset":34399}],"success":true}
{"timestamp":1581268089417,"name":"records_consumed","count":235,"partitions":[{"topic":"try_1","partition":0,"count":235,"minOffset":34399,"maxOffset":34633}]}
{"timestamp":1581268089419,"name":"offsets_committed","offsets":[{"topic":"try_1","partition":0,"offset":34634}],"success":true}
在上述结果中,键为空。我如何使用此命令发送大量消息?我试图研究一个如何使用它们的例子,但没有找到。它产生类似值的整数,但我可以在哪里插入消息?有什么办法可以使用此命令批量生成消息?也有可能在 Windows 中实现这样的命令还是只适用于 linux?任何指向示例的链接将不胜感激。
解决方案
脚本kafka-verifiable-producer.sh
执行类org.apache.kafka.tools.VerifiableProducer
。(https://github.com/apache/kafka/blob/trunk/tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java)
它的程序参数--throughput
,--repeating-keys
并--value-prefix
可能满足您的需求。
例如,以下生成具有前缀值的消息,111
并且具有每 5 条消息轮换的增量键。您还可以使用该--throughput
选项配置消息的吞吐量。在此示例中,它平均每秒产生 5 条消息。
./kafka-verifiable-producer.sh --broker-list localhost:9092 --max-messages 10 --repeating-keys 5 --value-prefix 100 --throughput 5 --topic test
{"timestamp":1581271492652,"name":"startup_complete"}
{"timestamp":1581271492860,"name":"producer_send_success","key":"0","value":"100.0","offset":45,"topic":"test","partition":0}
{"timestamp":1581271492862,"name":"producer_send_success","key":"1","value":"100.1","offset":46,"topic":"test","partition":0}
{"timestamp":1581271493048,"name":"producer_send_success","key":"2","value":"100.2","offset":47,"topic":"test","partition":0}
{"timestamp":1581271493254,"name":"producer_send_success","key":"3","value":"100.3","offset":48,"topic":"test","partition":0}
{"timestamp":1581271493256,"name":"producer_send_success","key":"4","value":"100.4","offset":49,"topic":"test","partition":0}
{"timestamp":1581271493457,"name":"producer_send_success","key":"0","value":"100.5","offset":50,"topic":"test","partition":0}
{"timestamp":1581271493659,"name":"producer_send_success","key":"1","value":"100.6","offset":51,"topic":"test","partition":0}
{"timestamp":1581271493860,"name":"producer_send_success","key":"2","value":"100.7","offset":52,"topic":"test","partition":0}
{"timestamp":1581271494063,"name":"producer_send_success","key":"3","value":"100.8","offset":53,"topic":"test","partition":0}
{"timestamp":1581271494268,"name":"producer_send_success","key":"4","value":"100.9","offset":54,"topic":"test","partition":0}
{"timestamp":1581271494483,"name":"shutdown_complete"}
{"timestamp":1581271494484,"name":"tool_data","sent":10,"acked":10,"target_throughput":5,"avg_throughput":5.452562704471101}
如果您正在寻找更多定制的消息键和值,最简单的是修改/扩展上述类。
推荐阅读
- firebase - 如何暂时关闭或关闭 Firebase 网络应用?
- java - Java + Spring 对 Map 进行序列化和反序列化
- python - 错误:没有找到车轮的匹配分布(dash-bootstrap-components)
- javascript - 使用浏览器保存实时数据
- java - 如何测试何时调用超类方法?
- c# - C# 对象不会超出范围并导致系统崩溃
- python - 使用 matplotlib 创建图像时,图形实例存储在哪里?
- jitsi - Iframe APi Jitsi 中的身份验证
- web-crawler - 如何使用 Stormcrawler 抓取新闻网站的 RSS 提要或站点地图?
- firebase - 防止数据库重复的正确方法