首页 > 解决方案 > 生成消息时,alpakka-kafka 连接器中使用的直通是什么?

问题描述

这是文档https://doc.akka.io/docs/alpakka-kafka/current/producer.html中给出的用于向 Kafka 生成单个消息的代码

val single: ProducerMessage.Envelope[KeyType, ValueType, PassThroughType] =
  ProducerMessage.single(
    new ProducerRecord("topicName", key, value),
    passThrough
  )

您能否解释一下 passThrough 的用途?

标签: scalaapache-kafkaakka-streamreactivealpakka

解决方案


passThrough is an additional value which comes to be available for use in ProducerMessage.Results’s passThrough().

As the official documentation states, the best and the most widely used practice is to use this value as a transport to passing the CommittableOffset to a downstream Committer.Sink.

This seems convenient in cases when you are using a CommittableSource along with the enable.auto.commit Kafka consumer option set to false. In such cases you have to commit Kafka offsets manually as you processing them. I in particular use such approach when reading records from one Kafka topic, processing them and then sending them to another Kafka topic.

Here is a simple use case:

Consumer
      .committableSource(consumerSettings, Subscriptions.topics(topicIn))
      .mapAsync { msg ⇒
        process(msg.record.value).map {
          response ⇒
            ProducerMessage.single(
              new ProducerRecord(topicOut, msg.record.key, response),
              passThrough = msg.committableOffset
            )
        }
      }
      .via(Producer.flexiFlow(producerSettings))
      .map(_.passThrough) // extract the passThrough
      .toMat(Committer.sink(committerSettings))(Keep.both) // commiting offsets
      .mapMaterializedValue(DrainingControl.apply)
      .run()

推荐阅读