scala - 我如何减少卡夫卡消费者/生产者的滞后
问题描述
我正在寻找 scala kafka 代码的改进。为了减少延迟,我应该在消费者和生产者中做什么。这是我从某人那里得到的代码。我知道这段代码并不难。但是我之前从未见过scala代码,我才刚刚开始学习kafka。所以我很难找到问题所在。
import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import scala.util.Try
class KafkaMessenger(val servers: String, val sender: String) {
val props = new Properties()
props.put("bootstrap.servers", servers)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("producer.type", "async")
val producer = new KafkaProducer[String, String](props)
def send(topic: String, message: Any): Try[Unit] = Try {
producer.send(new ProducerRecord(topic, message.toString))
}
def close(): Unit = producer.close()
}
object KafkaMessenger {
def apply(host: String, topic: String, sender: String, message: String): Unit = {
val messenger = new KafkaMessenger(host, sender)
messenger.send(topic, message)
messenger.close()
}
}
这是消费者代码。
import java.util.Properties
import java.util.concurrent.Executors
import com.satreci.g2gs.common.impl.utils.KafkaMessageTypes._
import kafka.admin.AdminUtils
import kafka.consumer._
import kafka.utils.ZkUtils
import org.I0Itec.zkclient.{ZkClient, ZkConnection}
import org.slf4j.LoggerFactory
import scala.language.postfixOps
class KafkaListener(val zookeeper: String,
val groupId: String,
val topic: String,
val handleMessage: ByteArrayMessage => Unit,
val workJson: String = ""
) extends AutoCloseable {
private lazy val logger = LoggerFactory.getLogger(this.getClass)
val config: ConsumerConfig = createConsumerConfig(zookeeper, groupId)
val consumer: ConsumerConnector = Consumer.create(config)
val sessionTimeoutMs: Int = 10 * 1000
val connectionTimeoutMs: Int = 8 * 1000
val zkClient: ZkClient = ZkUtils.createZkClient(zookeeper, sessionTimeoutMs, connectionTimeoutMs)
val zkUtils = new ZkUtils(zkClient, new ZkConnection(zookeeper), false)
def createConsumerConfig(zookeeper: String, groupId: String): ConsumerConfig = {
val props = new Properties()
props.put("zookeeper.connect", zookeeper)
props.put("group.id", groupId)
props.put("auto.offset.reset", "smallest")
props.put("zookeeper.session.timeout.ms", "5000")
props.put("zookeeper.sync.time.ms", "200")
props.put("auto.commit.interval.ms", "1000")
props.put("partition.assignment.strategy", "roundrobin")
new ConsumerConfig(props)
}
def run(threadCount: Int = 1): Unit = {
val streams = consumer.createMessageStreamsByFilter(Whitelist(topic), threadCount)
if (!AdminUtils.topicExists(zkUtils, topic)) {
AdminUtils.createTopic(zkUtils, topic, 1, 1)
}
val executor = Executors.newFixedThreadPool(threadCount)
for (stream <- streams) {
executor.submit(new MessageConsumer(stream))
}
logger.debug(s"KafkaListener start with ${threadCount}thread (topic=$topic)")
}
override def close(): Unit = {
consumer.shutdown()
logger.debug(s"$topic Listener close")
}
class MessageConsumer(val stream: MessageStream) extends Runnable {
override def run(): Unit = {
val it = stream.iterator()
while (it.hasNext()) {
val message = it.next().message()
if (workJson == "") {
handleMessage(message)
}
else {
val strMessage = new String(message)
val newMessage = s"$strMessage/#/$workJson"
val outMessage = newMessage.toCharArray.map(c => c.toByte)
handleMessage(outMessage)
}
}
}
}
}
具体来说,我想在发送消息时修改创建 KafkaProduce 对象的结构。似乎还有许多其他改进可以减少延迟。
解决方案
增加具有相同组 ID 的消费者(KafkaListener)实例的数量。会提高消费率。最终,生产者写入和消费者之间的延迟将最小化。
推荐阅读
- php - Wordpress 中的表单提交计数器
- javascript - OpenLayers 4 - 同时显示来自多个 GeoJSON 文件的特征
- css - 键盘将模式弹出窗口内的文本字段推到顶部
- android - 打开日历意图时出现“SecurityException:Permission Denial”
- java - Linux下java war文件解压异常
- apache-spark - Spark中迭代计算的推荐缓存策略是什么
- xamarin.forms - Listview 分组错误:
在“System.Object []”上找不到 - sql - 如何比较两个不同表中的值并返回 SQL Server 中一行中的任何值是否不同
- url - Yii2 网站,例如 http://www.website.org.in/site/post/tree-plantation-ballab-haryana 在启用重写规则时给出 404 错误
- javascript - Javascript无法读取Null的属性'getContext',但是