nestjs - 如何在 NestJS 中发布到 Kafka 客户端?
问题描述
我试图在 NestJs 中发布到 kafka
async publish<T extends IEvent>(event: T) {
await this.client.connect();
await this.client.send('topic', event);
}
但是还没有找到正确的方法,dispatchEvent 是受保护的。
编辑:使用 cqrs 所以这是在订阅事件总线的事件发布者中。
解决方案
我不相信你需要这个connect
方法。你确定你订阅了消息响应吗?下面是一个带有客户端控制器和服务器控制器的工作示例:
客户端控制器
import {
Controller,
Get,
Inject,
OnModuleDestroy,
OnModuleInit,
UseFilters,
} from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { ExceptionFilter } from './exception.filter';
@Controller()
export class KafkaClientController implements OnModuleInit, OnModuleDestroy {
constructor(@Inject('KAFKA_SERVICE') private readonly kafka: ClientKafka) {}
async onModuleInit() {
['hello', 'error', 'skip'].forEach((key) =>
this.kafka.subscribeToResponseOf(`say.${key}`),
);
}
onModuleDestroy() {
this.kafka.close();
}
@Get()
sayHello() {
return this.kafka.send('say.hello', { ip: '127.0.0.1' });
}
@Get('error')
@UseFilters(ExceptionFilter)
sayError() {
return this.kafka.send('say.error', { ip: '127.0.0.1' });
}
@Get('skip')
saySkip() {
return this.kafka.send('say.skip', { ip: '127.0.0.1' });
}
}
服务器控制器
import { BadRequestException, Controller, UseFilters } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';
import { OgmaSkip } from '@ogma/nestjs-module';
import { AppService } from '../../app.service';
import { ExceptionFilter } from './exception.filter';
@Controller()
export class KafkaServerController {
constructor(private readonly service: AppService) {}
@MessagePattern('say.hello')
sayHello() {
return this.service.getHello();
}
@UseFilters(ExceptionFilter)
@MessagePattern('say.error')
sayError() {
throw new BadRequestException('Borked');
}
@OgmaSkip()
@MessagePattern('say.skip')
saySkip() {
return this.service.getHello();
}
}
以上用于对我正在制作的库进行集成测试。您可以在此处查看完整的模块设置
推荐阅读
- reactjs - 元素类型无效:应为字符串(用于内置组件)或类/函数(用于复合组件)但得到:未定义
- scala - Scala:在scala中检查变量是否为整数
- sql-server - SP_Execute 未在输出参数中返回值
- javascript - 有没有更好的方法在一个 html 页面上多次运行 JavaScript?
- yocto - Yocto中populate_sdk时如何排除包
- maven - 如何在 teamcity SonarQube runner 中列出子模块
- android - Lollipop vs Nougat Runtime 目录权限到外部存储文件夹
- jquery - 下拉焦点无法按预期对后代元素起作用
- html - 代码块使用 PdfMake 破坏 pdf
- unity3d - Unity - 无尽奔跑游戏中的障碍物通过检测