首页 > 解决方案 > 从 Kafka 消息中解析结果并将其发送到 Web 套接字

问题描述

我有这个 Nodejs 代码,它正在监听 Kafka 主题并通过 Web 套接字返回消息:

const kafkaConsumer = new Kafka({
    clientId: 'Consumer',
    brokers: localhost:543
});

    const consumer = kafkaConsumer.consumer({groupId: KAFKA_GROUP_ID});
    await consumer.connect();
    await consumer.subscribe({topic: KAFKA_CONSUMER})

    await consumer.run({
        eachMessage: async ({topic, partition, message}) => {
            let parsedResponseData = JSON.parse(message.value.toString());
            Object.keys(parsedResponseData).forEach(function (key, value) {
                let topicByteBuffer = new Buffer.from(key);
                let buffers = [];
                buffers.push(topicByteBuffer);
                buffers.push(Buffer.from("|"));
                buffers.push(Buffer.from(JSON.stringify(value)));
                let bytes = Buffer.concat(buffers);
                futuresClients.forEach(function (client) {
                    client.send(bytes);
                });
            });
        },
    })

……

let clients = [];

    app.ws('/ws_endpoint', (ws, req) => {
        clients.push(ws);
        console.log("Client subscribed for response status notifications");
        ws.on('close', () => {
            let wsId = clients.indexOf(ws);
            clients.splice(wsId);
        });
        ws.on('error', (error) => {
            console.error(err);
            let wsId = clients.indexOf(ws);
            clients.splice(wsId);
        });
    })

重新应用程序:

useEffect(() => {
        let feedAddress = "ws://localhost/ws_endpoint";
        
        const feedClient = new W3CWebSocket(feedAddress);
        props.onUpdateWebsocket(feedClient);
        feedClient.onopen = () => {
            console.log("WebSocket Client Connected on " + feedAddress);
        };
        feedClient.onmessage = (message) => {
            let payload = JSON.parse(message.data);
            
            // parse here the response

        };
        
        return function cleanup() {
            feedClient.close();
        };
    }, []);

我可以有两种类型的消息:

成功:

{
     "requestId" : <string>,
     "method" : <string>,
     "response" : {
         "exchange": <string>,
         "messageId" : <string>
     }
}

错误:

response: {
   "errorMessage" : <string>,
   "errorCode" : <int>
}

如何处理这两种类型的消息并将两种类型的响应显示为对话框消息的警告消息?

标签: javascriptnode.jsreactjs

解决方案


推荐阅读