首页 > 解决方案 > 在网络工作者或服务工作者中运行 websocket - javascript

问题描述

我有来自不同站点的 9 个 websocket 连接,它们正在使用数据更新 DOM。目前我正在连接所有并监听所有 websocket 并使用函数调用更新数据。

我面临的问题是有很多 websocket 连接,并且存在内存和 CPU 使用问题。如何使用服务工作者和网络工作者来优化这么多的 websocket 连接?

async function appendGatePublicTickersData(e) {
  if (e.event == "update" && e.result[0].contract == "BTC_USD") {
    if ('total_size' in e.result[0]) {
      $(".gate-btc-open-interest").html(commaNumber(e.result[0].total_size))
      if ('last' in e.result[0]) {
        $(".gate-btc-open-value").html(commaNumber(customFixedRounding((e.result[0].total_size / e.result[0].last), 4)))
      }
    }

    if ('volume_24h_usd' in e.result[0]) {
      $(".gate-btc-24-volume").html(commaNumber(e.result[0].volume_24h_usd))
    }

    if ('volume_24h_btc' in e.result[0]) {
      $(".gate-btc-24-turnover").html(commaNumber(e.result[0].volume_24h_btc))
    }

    if ('funding_rate' in e.result[0]) {
      var fundingRateBtcGate = customFixedRounding(e.result[0].funding_rate * 100, 4)
      $(".public-gate-btc-funding").html(fundingRateBtcGate)
    }

    if ('funding_rate_indicative' in e.result[0]) {
      var predictedRateBtcGate = customFixedRounding(e.result[0].funding_rate_indicative * 100, 4)
      $(".public-gate-btc-predicted").html(predictedRateBtcGate)
    }
  }
}

var pubGateWs = new WebSocket("wss://fx-ws.gateio.ws/v4/ws/btc");

pubGateWs.addEventListener("open", function() {
  pubGateWs.send(JSON.stringify({
    "time": 123456,
    "channel": "futures.tickers",
    "event": "subscribe",
    "payload": ["BTC_USD", "ETH_USD"]
  }))
});

pubGateWs.addEventListener("message", function(e) {
  e = JSON.parse(e.data)
  appendGatePublicTickersData(e)
});

pubGateWs.addEventListener("close", function() {});

标签: javascriptjquerywebsocketservice-workerweb-worker

解决方案


由于您使用的是 Web Sockets,因此最好使用 aSharedWorker为您的 Web Sockets 创建一个新线程。WebWorkernormal和 a的区别在于SharedWorkerweb worker 在加载页面时会在每个 tab 或浏览器中创建一个新的 session,而 shared worker 会在每个 tab 中使用相同的 session。因此,您的所有选项卡或窗口都将使用相同的工作程序和相同的 Web Socket 连接。

如果数据更新非常频繁(每秒超过 60 次)并且每次都必须更新 DOM,则使用该requestAnimationFrame方法来限制 DOM 的更新量。在使用新内容更新 DOM 之前,它将等待下一个重绘周期,大约每秒 60 次,或 60FPS。

这个实现就像下面的例子:

主线程。

// Create shared worker.
const webSocketWorker = new SharedWorker('web-sockets-worker.js');

/**
 * Sends a message to the worker and passes that to the Web Socket.
 * @param {any} message 
 */
const sendMessageToSocket = message => {
  webSocketWorker.port.postMessage({ 
    action: 'send', 
    value: message,
  });
};

// Event to listen for incoming data from the worker and update the DOM.
webSocketWorker.port.addEventListener('message', ({ data }) => {
  requestAnimationFrame(() => {
    appendGatePublicTickersData(data);
  });
});
  
// Initialize the port connection.
webSocketWorker.port.start();

// Remove the current worker port from the connected ports list.
// This way your connectedPorts list stays true to the actual connected ports, 
// as they array won't get automatically updated when a port is disconnected.
window.addEventListener('beforeunload', () => {
  webSocketWorker.port.postMessage({ 
    action: 'unload', 
    value: null,
  });

  webSocketWorker.port.close();
});

共享工作者。

/**
 * Array to store all the connected ports in.
 */
const connectedPorts = [];

// Create socket instance.
const socket = new WebSocket("wss://fx-ws.gateio.ws/v4/ws/btc");

// Send initial package on open.
socket.addEventListener('open', () => {
  const package = JSON.stringify({
    "time": 123456,
    "channel": "futures.tickers",
    "event": "subscribe",
    "payload": ["BTC_USD", "ETH_USD"]
  });

  socket.send(package);
});

// Send data from socket to all open tabs.
socket.addEventListener('message', ({ data }) => {
  const package = JSON.parse(data);
  connectedPorts.forEach(port => port.postMessage(package));
});

/**
 * When a new thread is connected to the shared worker,
 * start listening for messages from the new thread.
 */
self.addEventListener('connect', ({ ports }) => {
  const port = ports[0];

  // Add this new port to the list of connected ports.
  connectedPorts.push(port);

  /**
   * Receive data from main thread and determine which
   * actions it should take based on the received data.
   */
  port.addEventListener('message', ({ data }) => {
    const { action, value } = data;

    // Send message to socket.
    if (action === 'send') {
      socket.send(JSON.stringify(value));

    // Remove port from connected ports list.
    } else if (action === 'unload') {
      const index = connectedPorts.indexOf(port);
      connectedPorts.splice(index, 1);
    }
  });

  // Start the port broadcasting.
  port.start();
});

旁注:您appendGatePublicTickersData不使用await关键字,因此它不必是async函数。


推荐阅读