首页 > 解决方案 > 生成消息后从 Kafka 代理获得响应

问题描述

我们正在使用以下代码成功生成 Kafka 主题:

总而言之,我们使用 Deno.connect 创建 TCP 套接字并利用 KafkaJS 对消息进行编码并成功将其发送到在端口 9093 上运行的代理。然后我们可以查看我们的消费者并看到消息成功生成。

 /** @format */
import { Encoder } from 'file:///C:/Users/wesge/Desktop/Coding/kafkaEx/protocol/encoder.js';

import { Decoder } from 'file:///C:/Users/wesge/Desktop/Coding/kafkaEx/protocol/decoder.js';

import { Buffer } from 'https://deno.land/std@0.76.0/node/buffer.ts';

import { readAll, writeAll } from 'https://deno.land/std@0.105.0/io/util.ts';

import request from 'file:///C:/Users/wesge/Desktop/Coding/kafkaEx/protocol/request.js';

import { readerFromStreamReader } from 'https://deno.land/std/io/mod.ts';

import {
  ProducerBatch,
  Message,
  IHeaders,
  TopicMessages,
  Broker,
  TopicOffsets,
  PartitionOffset,
} from './index.d.ts';

// import { Produce: apiKey } from '../../apiKeys'

// import { Produce: apiKey } from './protocol/requests/apiKeys'

import MessageSet from './protocol/messageSet/index.js';

import {
  Client,
  Packet,
  Event,
} from 'https://deno.land/x/tcp_socket@0.0.1/mods.ts';

let date = await new Date(Date.now()).toUTCString();

export default async function func(string: string = date) {
  console.log(typeof string);
  const conn = await Deno.connect({
    hostname: 'localhost',
    port: 9093,
    transport: 'tcp',
  });
  //console.log('Connected', conn.write);

  interface topicDataType {
    topicData: Array<{
      topic: string;
      partitions: Array<{
        partition: number;
        firstSequence?: number;
        messages: Message[];
      }>;
    }>;
  }

  //producedMessage = ({acks, timeout, topicData}: ProducerBatch) =>
  const producedMessage = ({ acks, timeout, topicData }: ProducerBatch) => ({
    apiKey: 0, //0
    apiVersion: 0,
    apiName: 'Produce',
    //expectResponse: () => acks !== 0,
    encode: async () => {
      return new Encoder()
        .writeInt16(acks)
        .writeInt32(timeout)
        .writeArray(topicData.map(encodeTopic));
    },
  });

  //topicData structure
  const td = [
    {
      topic: 'quickstart-events',
      partitions: [
        {
          partition: 0,
          firstSequence: undefined,
          messages: [
            {
              key: 'new-key6',
              value: string,
              partition: 0,
              headers: undefined,
              timestamp: Date.now(),
            },
          ],
        },
      ],
    },
  ];

  const message = producedMessage({
    acks: 0,
    timeout: 1000,
    topicData: td,
  });

  const encodeTopic = ({ topic, partitions }: any) => {
    return new Encoder()
      .writeString(topic)
      .writeArray(partitions.map(encodePartitions));
  };

  const encodePartitions = ({ partition, messages }: any) => {
    const messageSet = MessageSet({
      messageVersion: 0,
      compression: 0,
      entries: messages,
    });
    return new Encoder()
      .writeInt32(partition)
      .writeInt32(messageSet.size())
      .writeEncoder(messageSet);
  };

  const pleaseWork = await request({
    correlationId: 1,
    clientId: 'my-app',
    request: message,
  });

  console.log('GOT TO HERE SAM', pleaseWork);

  const denoVersion = await conn.write(pleaseWork.buf);

  console.log('Hello is ', denoVersion);

  conn.close();
}

func();

现在,我们正在尝试做同样的事情,并以与上述类似的方式使用 Kafka 协议创建消费者。

我们的第一步是在生成消息后尝试接收响应。我们尝试了各种使用 Deno.listen、Deno.connect 等创建 TCP 侦听服务器的方法,但都没有成功。以下是我们尝试过的大多数不同方法的集合:

    /** @format */

// /** @format */

// import { Application, Router, send } from 'https://deno.land/x/oak/mod.ts';

import producer from './shitProducer.ts';

import { copy } from 'https://deno.land/std@0.103.0/io/util.ts';

// const router = new Router();

// router
//   .get('/test', (ctx) => {
//     ctx.response.body = 'Hello World!';
//     producer();
//   })
//   .post('/test', (ctx) => {
//     ctx.response.body = "You've posted!";
//   })
//   .delete('/test', (ctx) => {
//     console.log('Request body ', ctx.request.body);
//     ctx.response.body = 'You deleted!';
//   });

// const app = new Application();

// app.use(router.routes());

// app.use(router.allowedMethods());

// app.use(async (ctx) => {
//   await send(ctx, ctx.request.url.pathname, {
//     root: `${Deno.cwd()}`,
//     index: 'index.html',
//   });
// });

// await app.listen({ port: 8000 });

// const server = Deno.listen({ port: 8000 });
// console.log('tcp server listening on port 8000');

// const newConn = await Deno.connect({
//   port: 9093,
//   hostname: 'localhost',
//   transport: 'tcp',
// });

// const connections: Deno.Conn[] = [newConn];

// // for await (const connection of server) {
// //   // new connection
// //   //connections.push(connection);
// //   handle_connection(connection);
// // }

// console.log('line 50');
// //producer();

// async function handle_connection(connection: Deno.Conn) {
//   console.log('in handle connection');
//   let buffer = new Uint8Array(512);
//   while (true) {
//     const count = await connection.read(buffer);
//     console.log('count', count);
//     if (!count) {
//       // connection closed
//       const index = connections.indexOf(connection);
//       connections.splice(index, 1);
//       console.log('about to break');
//       break;
//     } else {
//       // message received
//       let message = buffer.subarray(0, count);
//       for (const current_connection of connections) {
//         if (current_connection !== connection) {
//           console.log('in else');
//           await current_connection.write(message);
//         }
//       }
//     }
//   }
// }
// await handle_connection(newConn);

const listener = Deno.listen({ port: 9093 });
setInterval(() => console.log('listener', listener), 1000);
console.log('listening on 0.0.0.0:8080');
for await (const conn of listener) {
  console.log('Hello');
  copy(conn, conn).finally(() => conn.close());
}

有人知道我们需要做什么才能阅读我们应该从经纪人那里得到的回应吗?谢谢!!

标签: apache-kafkakafka-consumer-apiconfluent-platformkafka-producer-apideno

解决方案


推荐阅读