java - 生成“假”流数据。卡夫卡-Flink
问题描述
我正在尝试生成流数据,以模拟我在不同的时间范围内接收到两个值(整数类型)、时间戳和 Kafka 作为连接器的情况。
我正在使用 Flink 环境作为消费者,但我不知道哪个是生产者的最佳解决方案。(如果可能,Java 语法比 Scala 更好)
我应该直接从 Kafka 生成数据吗?如果是,最好的方法是什么?或者,如果我作为生产者从 Flink 生成数据,将其发送到 Kafka 并在最后由 Flink 再次使用它,也许会更好?我怎么能从 flink 做到这一点?或者也许还有另一种简单的方法来生成流数据并将其传递给 Kafka。
如果是,请让我走上实现它的轨道。
解决方案
正如 David 还提到的,您可以使用 KafkaProducer API 在简单的 Java 中创建一个虚拟生产者,以根据需要安排消息并将消息发送到 Kafka。同样,如果您想要多个同时生产者,您可以使用 Flink 来做到这一点。使用 Flink,您需要为生产者和消费者编写单独的作业。Kafka 基本上启用了异步处理架构,因此它没有队列机制。所以最好把生产者和消费者的工作分开。
但请多考虑一下这个测试的意图:
您是否正在尝试测试 Kafka 流式传输的持久性、复制、偏移管理功能
在这种情况下,您需要同一主题的同时生产者,消息中的键为空或非空。
或者您是否正在尝试测试 Flink-Kafka 连接器功能。
在这种情况下,你只需要一个生产者,很少有内部场景可以通过让生产者推送超过消费者处理能力的消息来进行背压测试。
或者您是否正在尝试测试主题分区和 Flink 流并行性。
在这种情况下,单个或多个生产者但消息的 key 应该是非空的,您可以测试 Flink 执行器如何与各个分区连接并观察它们的行为。
您可能想要测试更多想法,并且每个想法都需要在生产者中完成或不完成某些特定的事情。
如果需要,您可以查看https://github.com/abhisheknegi/twitStream
使用 Java API 提取推文。
推荐阅读
- php - 为什么我的 symfony 验证码总是返回有效?
- python - 由于 QtDesigner 中提升的小部件,检测 PyQt5 脚本的重新加载?
- c# - 如何将 JSON 保存序列化为变量并在控制台中显示 json 字符串
- python - 在 python 3.8 上获取 urllib.error.URLError
- typescript - 更新 Deno 后的 Typescript 导入问题
- r - 如何使用正则表达式提取模式的第 n 次出现
- regex - 两个数字之间的正则表达式和空格
- .net - IQueryable - 如何从数据库集中获取所有独特的年份?
- php - 带有 Livewire 的 Laravel 待办事项列表
- google-apps-script - 使用谷歌表格选中复选框时向单个收件人发送合并邮件