apache-kafka - 有没有办法在 Apache Kafka 2.0 中确定消息的优先级?
问题描述
编辑
万一其他人处于这种特殊情况,我在调整消费者配置后得到了类似于我正在寻找的东西。我创建了一个生产者,将优先级消息发送到三个单独的主题(用于高/中/低优先级),然后我创建了 3 个单独的消费者来消费每个主题。然后我经常轮询优先级较高的主题,除非高为空,否则不轮询较低优先级的主题:
while(true) {
final KafkaConsumer<String,String> highPriConsumer = createConsumer(TOPIC1);
final KafkaConsumer<String,String> medPriConsumer = createConsumer(TOPIC2);
final ConsumerRecords<String, String> consumerRecordsHigh = highPriConsumer.poll(100);
if (!consumerRecordsHigh.isEmpty()) {
//process high pri records
} else {
final ConsumerRecords<String, String> consumerRecordsMed = medPriConsumer.poll(100);
if (!consumerRecordsMed.isEmpty()) {
//process med pri records
轮询超时(该.poll()
方法的参数)确定如果没有要轮询的记录,则等待多长时间。我为每个主题将其设置为非常短的时间,但您可以将其设置为较低的优先级,以确保当存在高优先级消息时它不会消耗宝贵的周期等待
max.poll.records
配置显然决定了一次轮询中要抓取的最大记录数。对于更高的优先级,这也可以设置得更高。
配置确定轮询之间的时间 - 处理消息max.poll.interval.ms
需要多长时间。在这里max.poll.records
澄清。
另外,我相信暂停/恢复整个消费者/主题可以这样实现:
kafkaConsumer.pause(kafkaConsumer.assignment())
if(kafkaConsumer.paused().containsAll(kafkaConsumer.assignment())) {
kafkaConsumer.resume(kafkaConsumer.assignment());
}
我不确定这是否是最好的方法,但我在其他地方找不到一个很好的例子
我同意下面的 senseiwu 的观点,这并不是 Kafka 的真正正确用途。这是单线程处理,每个主题都有一个专门的消费者,但我将从这里开始改进这个过程。
背景
我们正在尝试改进我们的应用程序,并希望使用 Apache Kafka 在解耦组件之间进行消息传递。我们的系统经常是低带宽的(尽管在某些情况下带宽可能会很高),并且有小的、高优先级的消息必须在较大的文件等待时处理,或者处理缓慢以消耗更少的带宽。我们希望有不同优先级的主题。
我是 Kafka 的新手,但尝试研究处理器 API 和 Kafka Streams 都没有成功,尽管论坛上的某些帖子似乎说这是可行的。
处理器 API
当我尝试 时,我尝试通过检查是否为空Processor API
来确定高优先级当前是否正在处理任何内容,然后希望与 Med Priority Consumer 一起处理,但第二个主题民意调查返回为空。似乎也没有一种简单的方法可以让所有人都了解一个主题以便打电话。KafkaConsumer
poll()
poll()
TopicPartition
kafkaConsumer.pause(partitions)
卡夫卡流
当我尝试时KafkaStreams
,我设置了一个流来从我的每个“优先级”主题中消费,但是没有办法检查连接到更高优先级主题的KStream
或实例当前是否处于空闲或正在处理中。KafkaStreams
我的代码基于这个文件
其他
我也在这里尝试了代码:priority-kafka-client,但它没有按预期工作,因为运行下载的测试文件具有混合优先级。
我找到了这个线程,其中一位开发人员说(解决为主题添加优先级的问题):“......用户可以通过暂停和恢复来实现此行为”。但我无法弄清楚他的意思是如何做到这一点。
我找到了这篇StackOverflow 文章,但他们似乎使用的是一个非常旧的版本,我不清楚他们的映射功能应该如何工作。
结论
如果有人能告诉我他们是否认为这是值得追求的事情,我将不胜感激。如果这不是 Apache Kafka 应该如何工作的,因为它破坏了从自动主题/分区处理中获得的好处,那很好,我会在别处寻找。然而,有很多情况下人们似乎在这方面取得了成功,我想尝试一下。谢谢你。
解决方案
这听起来像是您的应用程序中的一个设计问题 - kafka 最初被设计为一个提交日志,其中每条消息都以偏移量写入代理,并且各种消费者按照它们提交的顺序以非常低的延迟和高吞吐量来使用它们。鉴于分区而不是主题是 Kafka 中工作分配的基本单元,因此很难在本地实现主题级别的优先级。
我建议您调整您的设计以使用 Kafka 以外的其他架构组件,而不是试图剪掉您的脚以适应鞋子。您已经可以做的一件事是让您的生产者将文件上传到适当的文件存储并通过 Kafka 发送链接,包括元数据。然后根据带宽状态,您的消费者可以根据大文件的元数据来决定是否可以下载。这样,您可能更有可能拥有健壮的设计,而不是错误地使用 Kafka。
如果您确实只想坚持使用 Kafka,一种解决方案是将大文件发送到一些固定数量的硬编码分区,并且消费者仅在带宽良好时才从这些分区消费。
推荐阅读
- windows - 在仍然连接互联网的情况下工作 20 分钟后,Chrome/IE 上的 ERR_CONNECTION_FAILED
- assembly - 如何将字符串的地址更改为字符串的第n个字符
- c# - 为什么具有 1 和 2 秩的非零基数组具有不同的类型?
- python - Sqlalchemy 双关联表?
- tomcat - $CATALINA_HOME/shared/lib 是 Tomcat 的真正功能吗?
- java - 如何将我从 api 收到的响应保存在 arraylist 上?
- java - 如何创建 RecyclerView 拖放(交换 2 个项目位置版本)
- opencv3.0 - Google Colab 上的 opencv 矩形
- javascript - 我不断收到“CodeMirror.foldCode 不是函数”。代码折叠插件是否不适用于自定义/简单模式?
- angular-reactive-forms - 具有来自 Input 的数据并更改值格式的反应式表单