首页 > 解决方案 > 如何在 NestJS 中发布到 Kafka 客户端?

问题描述

我试图在 NestJs 中发布到 kafka

 async publish<T extends IEvent>(event: T) {
    await this.client.connect();
    await this.client.send('topic', event);
  }

但是还没有找到正确的方法,dispatchEvent 是受保护的。

编辑:使用 cqrs 所以这是在订阅事件总线的事件发布者中。

标签: nestjs

解决方案


我不相信你需要这个connect方法。你确定你订阅了消息响应吗?下面是一个带有客户端控制器和服务器控制器的工作示例:

客户端控制器

import {
  Controller,
  Get,
  Inject,
  OnModuleDestroy,
  OnModuleInit,
  UseFilters,
} from '@nestjs/common';
import { ClientKafka } from '@nestjs/microservices';
import { ExceptionFilter } from './exception.filter';

@Controller()
export class KafkaClientController implements OnModuleInit, OnModuleDestroy {
  constructor(@Inject('KAFKA_SERVICE') private readonly kafka: ClientKafka) {}

  async onModuleInit() {
    ['hello', 'error', 'skip'].forEach((key) =>
      this.kafka.subscribeToResponseOf(`say.${key}`),
    );
  }

  onModuleDestroy() {
    this.kafka.close();
  }

  @Get()
  sayHello() {
    return this.kafka.send('say.hello', { ip: '127.0.0.1' });
  }

  @Get('error')
  @UseFilters(ExceptionFilter)
  sayError() {
    return this.kafka.send('say.error', { ip: '127.0.0.1' });
  }

  @Get('skip')
  saySkip() {
    return this.kafka.send('say.skip', { ip: '127.0.0.1' });
  }
}

服务器控制器

import { BadRequestException, Controller, UseFilters } from '@nestjs/common';
import { MessagePattern } from '@nestjs/microservices';
import { OgmaSkip } from '@ogma/nestjs-module';
import { AppService } from '../../app.service';
import { ExceptionFilter } from './exception.filter';

@Controller()
export class KafkaServerController {
  constructor(private readonly service: AppService) {}

  @MessagePattern('say.hello')
  sayHello() {
    return this.service.getHello();
  }

  @UseFilters(ExceptionFilter)
  @MessagePattern('say.error')
  sayError() {
    throw new BadRequestException('Borked');
  }

  @OgmaSkip()
  @MessagePattern('say.skip')
  saySkip() {
    return this.service.getHello();
  }
}

以上用于对我正在制作的库进行集成测试。您可以在此处查看完整的模块设置


推荐阅读