scala - 如何使用 lagom 框架实现 kafka 消费者
问题描述
尝试了不同的文档,但无法使用 lagom 实现 kafka 消费者 api。跟随消息代理并无法创建对象,因为 com.lightbend.lagom.scaladsl.server.LagomServerComponents 中的成员 lagomServer: LagomServer 未在加载程序类中定义。下面是我的加载器类的代码片段。
class ConsumerLoader extends LagomApplicationLoader {
override def load(context: LagomApplicationContext): LagomApplication =
new ConsumerApplication(context) with ConfigurationServiceLocatorComponents
override def describeService = Some(readDescriptor[ConsumerService])
}
abstract class ConsumerApplication(context: LagomApplicationContext)
extends LagomApplication(context)
with AhcWSComponents {
lazy val kafkaService = serviceClient.implement[ConsumerService]
}
请向我提供有关如何实现 kafka 消息使用者的有用文档链接。
解决方案
我按照以下方式做到了:
当读取和写入主题的服务是不同的服务或您只需要实现阅读器时:
- 将主题方法添加
def topic: Topic[Envelope]
到服务阅读器中
trait ReaderKafkaService extends Service {
def topic1: Topic[Envelope]
override final def descriptor: Descriptor = {
named("kafka-reader")
.withTopics(
topic("topic-name", topic1)
)
.withAutoAcl(true)
}
}
- 服务,您可以从 kafka 读取什么:
ConsumerService extends Service {
override final def descriptor: Descriptor = {
named("consumer-service")
.withAutoAcl(true)
}
}
- 在 Loader 中添加这个服务:
lazy val kafkaService: ReaderKafkaService = serviceClient.implement[ReaderKafkaService]
- 在您的 impl 中注入创建的服务:
class ServiceImpl(
kafkaService: ReaderKafkaService,
) extends ConsumerService
- 订阅主题
class ServiceImpl(
kafkaService: ReaderKafkaService,
) extends ConsumerService {
kafkaService.topic1.subscribe
.withGroupId("group-1")
.atLeastOnce(
Flow[Envelope]
.mapAsync(1) {
case envelope: Envelope =>
println(s" Message from topic: $envelope")
Future.successful(Done)
}
.recover {
case e =>
log.error(s"Invalid message $e")
Done
}
)
}
- 如果需要,为 kafka 添加配置配置
kafka {
bootstrap.servers = "localhost:9092"
}
当您想从同一个服务中写入和读取时:
- 添加服务方法:
trait ReaderWriterService extends Service {
def topic1: Topic[Envelope]
override final def descriptor: Descriptor = {
named("kafka-reader-writer")
.withTopics(
topic("topic-name", topic1)
)
.withAutoAcl(true)
}
}
- 服务内涵:
class ServiceImpl(
kafkaService: ReaderWriterService,
) extends ReaderWriterService {
kafkaService.topic1.subscribe
.withGroupId("group-1")
.atLeastOnce(
Flow[Envelope]
.mapAsync(1) {
case envelope: String =>
println(s" Message from topic: $envelope")
Future.successful(Done)
}
.recover {
case e =>
log.error(s"Invalid message $e")
Done
}
)
override def topic1(): Topic[String] =
TopicProducer.singleStreamWithOffset { fromOffset =>
persistentEntityRegistry
.eventStream(Event.Tag, fromOffset)
.map(ev => ("Hi world", ev.offset))
}
}
- 在加载器中,您需要扩展
with LagomKafkaComponents
并将其添加为服务lazy val kafka: ProfileService = serviceClient.implement[ProfileService]
abstract class Application(context: LagomApplicationContext)
extends LagomApplication(context)
with CassandraPersistenceComponents
with LagomKafkaComponents
with AhcWSComponents {
override lazy val lagomServer = serverFor[ReaderWriterService](wire[ServiceImpl])
lazy val kafka: ReaderWriterService = serviceClient.implement[ReaderWriterService]
persistentEntityRegistry.register(wire[PersistentEntity])
}
推荐阅读
- jmeter - 如何在 Sampler Header 中使用正则表达式提取器提取 api 令牌
- amazon-web-services - 如何允许 AWS 从批处理作业访问另一个账户上的 S3
- leaflet - 我可以向 Leaflet.PM 工具栏添加其他自定义图标吗
- html - vue.js / v-for:如何根据列表索引设置 html 样式
- spring-boot - Spring应用启动失败,多个ConversionService、webFluxConversionService和intergationConversionService
- mongodb - MongoDB - 获取服务器连接的等待队列已满
- python - 如何将绘制的图形放置在 tkinter 窗口的特定位置
- css - 如何“自动化”div宽度适应屏幕尺寸?
- hyperledger-fabric - Hyperledger Fabric 和 Blockchain Platform V2.0 有什么区别?
- react-native - 从 react-native-web 应用程序使用参数访问 URL