首页 > 解决方案 > 如何使用 lagom 框架实现 kafka 消费者

问题描述

尝试了不同的文档,但无法使用 lagom 实现 kafka 消费者 api。跟随消息代理无法创建对象,因为 com.lightbend.lagom.scaladsl.server.LagomServerComponents 中的成员 lagomServer: LagomServer 未在加载程序类中定义。下面是我的加载器类的代码片段。

class ConsumerLoader extends LagomApplicationLoader {

  override def load(context: LagomApplicationContext): LagomApplication =
    new ConsumerApplication(context) with ConfigurationServiceLocatorComponents

  override def describeService = Some(readDescriptor[ConsumerService])
}

abstract class ConsumerApplication(context: LagomApplicationContext)
  extends LagomApplication(context)
    with AhcWSComponents {

  lazy val kafkaService = serviceClient.implement[ConsumerService]
}

请向我提供有关如何实现 kafka 消息使用者的有用文档链接。

标签: scalaapache-kafkamessage-queuekafka-consumer-apilagom

解决方案


我按照以下方式做到了:

当读取和写入主题的服务是不同的服务或您只需要实现阅读器时:

  1. 将主题方法添加def topic: Topic[Envelope]到服务阅读器中
trait ReaderKafkaService extends Service {
  def topic1: Topic[Envelope]

  override final def descriptor: Descriptor = {

    named("kafka-reader")
      .withTopics(
        topic("topic-name", topic1)
      )
      .withAutoAcl(true)
  }
}
  1. 服务,您可以从 kafka 读取什么:
ConsumerService extends Service {
  override final def descriptor: Descriptor = {

    named("consumer-service")
      .withAutoAcl(true)
  }
}
  1. 在 Loader 中添加这个服务:
lazy val kafkaService: ReaderKafkaService = serviceClient.implement[ReaderKafkaService]
  1. 在您的 impl 中注入创建的服务:

class ServiceImpl(
    kafkaService: ReaderKafkaService,
) extends ConsumerService
  1. 订阅主题
class ServiceImpl(
    kafkaService: ReaderKafkaService,
) extends ConsumerService {

  kafkaService.topic1.subscribe
    .withGroupId("group-1")
    .atLeastOnce(
      Flow[Envelope]
        .mapAsync(1) {
          case envelope: Envelope =>
              println(s" Message from topic: $envelope") 
              Future.successful(Done)
        }
        .recover {
          case e =>
            log.error(s"Invalid message $e")
            Done
        }
    )
}
  1. 如果需要,为 kafka 添加配置配置
kafka {
  bootstrap.servers = "localhost:9092"
}

当您想从同一个服务中写入和读取时:

  1. 添加服务方法:
trait ReaderWriterService extends Service {
  def topic1: Topic[Envelope]

  override final def descriptor: Descriptor = {

    named("kafka-reader-writer")
      .withTopics(
        topic("topic-name", topic1)
      )
      .withAutoAcl(true)
  }
}
  1. 服务内涵:
class ServiceImpl(
    kafkaService: ReaderWriterService,
) extends ReaderWriterService {

  kafkaService.topic1.subscribe
    .withGroupId("group-1")
    .atLeastOnce(
      Flow[Envelope]
        .mapAsync(1) {
          case envelope: String =>
              println(s" Message from topic: $envelope") 
              Future.successful(Done)
        }
        .recover {
          case e =>
            log.error(s"Invalid message $e")
            Done
        }
    )

  override def topic1(): Topic[String] =
    TopicProducer.singleStreamWithOffset { fromOffset =>
      persistentEntityRegistry
        .eventStream(Event.Tag, fromOffset)
        .map(ev => ("Hi world", ev.offset))
    }
}
  1. 在加载器中,您需要扩展 with LagomKafkaComponents并将其添加为服务 lazy val kafka: ProfileService = serviceClient.implement[ProfileService]
abstract class Application(context: LagomApplicationContext)
  extends LagomApplication(context)
    with CassandraPersistenceComponents
    with LagomKafkaComponents
    with AhcWSComponents {

  override lazy val lagomServer = serverFor[ReaderWriterService](wire[ServiceImpl])

  lazy val kafka: ReaderWriterService = serviceClient.implement[ReaderWriterService]

  persistentEntityRegistry.register(wire[PersistentEntity])
}

推荐阅读