javascript - 从 Kafka 消息中解析结果并将其发送到 Web 套接字
问题描述
我有这个 Nodejs 代码,它正在监听 Kafka 主题并通过 Web 套接字返回消息:
const kafkaConsumer = new Kafka({
clientId: 'Consumer',
brokers: localhost:543
});
const consumer = kafkaConsumer.consumer({groupId: KAFKA_GROUP_ID});
await consumer.connect();
await consumer.subscribe({topic: KAFKA_CONSUMER})
await consumer.run({
eachMessage: async ({topic, partition, message}) => {
let parsedResponseData = JSON.parse(message.value.toString());
Object.keys(parsedResponseData).forEach(function (key, value) {
let topicByteBuffer = new Buffer.from(key);
let buffers = [];
buffers.push(topicByteBuffer);
buffers.push(Buffer.from("|"));
buffers.push(Buffer.from(JSON.stringify(value)));
let bytes = Buffer.concat(buffers);
futuresClients.forEach(function (client) {
client.send(bytes);
});
});
},
})
……
let clients = [];
app.ws('/ws_endpoint', (ws, req) => {
clients.push(ws);
console.log("Client subscribed for response status notifications");
ws.on('close', () => {
let wsId = clients.indexOf(ws);
clients.splice(wsId);
});
ws.on('error', (error) => {
console.error(err);
let wsId = clients.indexOf(ws);
clients.splice(wsId);
});
})
重新应用程序:
useEffect(() => {
let feedAddress = "ws://localhost/ws_endpoint";
const feedClient = new W3CWebSocket(feedAddress);
props.onUpdateWebsocket(feedClient);
feedClient.onopen = () => {
console.log("WebSocket Client Connected on " + feedAddress);
};
feedClient.onmessage = (message) => {
let payload = JSON.parse(message.data);
// parse here the response
};
return function cleanup() {
feedClient.close();
};
}, []);
我可以有两种类型的消息:
成功:
{
"requestId" : <string>,
"method" : <string>,
"response" : {
"exchange": <string>,
"messageId" : <string>
}
}
错误:
response: {
"errorMessage" : <string>,
"errorCode" : <int>
}
如何处理这两种类型的消息并将两种类型的响应显示为对话框消息的警告消息?
解决方案
推荐阅读
- sitecore - 具有不同项目解决方案的单个 Sitecore 实例
- php - Ubuntu和php7.4上的Laravel我只能看到带有文件和文件夹的页面
- javascript - React Router 推送到状态警告
- node.js - 为什么 fs.watch 仅在我打开正在监视的文件时触发?
- neo4j - 编写一个查询返回neo4j中两个节点之间的补充关系
- virtualbox - oracle virtual box中虚拟机的命令行安装
- machine-learning - 我在哪里可以为我的机器学习项目获取大量图像?
- java - 只是我想在构建失败时制定目标 Maven 项目
- python - 设置为在 Mac 上启动时运行的 Python 脚本将不会继续运行
- sql - 如何只选择每天至少有一条记录的ID?