首页 > 解决方案 > websocket 客户端:node.js 真的比 go(gorilla 和 gobwas)快吗?

问题描述

我正在将我的 websocket 代码从 node.js 移动到 golang,在那里我对数据进行了大量处理。对我来说,关键问题是尽可能快地读取和处理数据。在仅读取数据并将 node.js 与两个 golang 解决方案进行比较时,我无法让 golang 与 node.js 一样快。在下面的基准测试中,golang-gobwas 解决方案平均比 200k 消息慢 2.2 毫秒(在 22% 的情况下更快),而 gorilla 慢 1.8 毫秒(在 23% 的情况下更快)。

基准 node.js 代码是:

"use strict"
const WebSocket   = require('ws')

var ws = new WebSocket("wss://api.hitbtc.com/api/2/ws")

ws.onopen = function(evt) { 
    hitbtc_marketnames().forEach( function (marketname) {
        var marketid = marketname.replace('/', '')
        send_args({ method: "subscribeOrderbook", params: {symbol: marketid}, id: 123}, ws)
        send_args({ method: "subscribeTrades", params: {symbol: marketid}, id: 124}, ws)
    })
}
ws.onerror = function(evt) { 
    throw('error')
}
ws.onclose = function(evt) {
    throw('connection closed')
}
ws.onmessage = function(evt) { 

    var data = JSON.parse(evt.data)
    var ts = Date.now() / 1000

    if (data != undefined && data.params != undefined && data.params.timestamp != undefined) {
        var delay = Date.now() - new Date(data.params.timestamp).getTime()
        console.log(data.params.symbol, ";", data.params.sequence, ";", data.params.timestamp, ";", delay, '; js ;', evt.data.length)
    }

}

function send_args (args, ws ) {
    var msg = JSON.stringify(args)
    console.log(Date.now(), ' send: '+msg)
    try {
        ws.send(msg)
    } catch(ex) {
        console.log(ex)
    }
}

function hitbtc_marketnames() {
    // return ['ETH/PAX']
    return ['ADA/BCH','ADA/BTC','ADA/ETH','ADA/USD','BCH/EURS','BNB/BTC','BNB/ETH','BNB/USD','BSV/BTC','BSV/USD','BTC/EURS','BTC/PAX','BTC/USD','BTC/USDC','BTG/BTC','BTG/ETH','BTG/USD','DASH/BCH','DASH/BTC','DASH/EOS','DASH/ETH','DASH/EURS','DASH/USD','DOGE/BTC','DOGE/ETH','DOGE/USD','EOS/BCH','EOS/BTC','EOS/ETH','EOS/EURS','EOS/PAX','EOS/USD','ETC/BCH','ETC/BTC','ETC/ETH','ETC/USD','ETH/BTC','ETH/EURS','ETH/PAX','ETH/USD','ETH/USDC','EURS/USD','HT/BTC','HT/USD','IOTA/BTC','IOTA/ETH','IOTA/USD','LEO/USD','LINK/BCH','LINK/BTC','LINK/ETH','LINK/USD','LTC/BCH','LTC/BTC','LTC/EOS','LTC/ETH','LTC/EURS','LTC/USD','NEO/BTC','NEO/EOS','NEO/ETH','NEO/EURS','NEO/USD','OMG/BCH','OMG/BTC','OMG/ETH','OMG/USD','QTUM/BTC','QTUM/ETH','QTUM/USD','TRX/BCH','TRX/BTC','TRX/EOS','TRX/ETH','TRX/USD','USD/PAX','USDT/USD','USD/USDC','XEM/BTC','XEM/ETH','XLM/BCH','XLM/BTC','XLM/ETH','XLM/USD','XMR/BCH','XMR/BTC','XMR/EOS','XMR/ETH','XMR/EURS','XMR/USD','XRP/BCH','XRP/BTC','XRP/EOS','XRP/ETH','XRP/EURS','XRP/USDT','XTZ/BTC','XTZ/ETH','XTZ/USD','ZEC/BCH','ZEC/BTC','ZEC/EOS','ZEC/ETH','ZEC/EURS','ZEC/USD']
}

golang-gobwas 解决方案是

package main

import (
    "context"
    "encoding/json"
    "fmt"
    "github.com/gobwas/ws"
    "github.com/gobwas/ws/wsutil"
    "log"
    "net/url"
    "os"
    "os/signal"
    "strings"
    "time"
)

func hitbtc_marketname() []string {
    return []string{"ADA/BCH", "ADA/BTC", "ADA/ETH", "ADA/USD", "BCH/EURS", "BNB/BTC", "BNB/ETH", "BNB/USD", "BSV/BTC", "BSV/USD", "BTC/EURS", "BTC/PAX", "BTC/USD", "BTC/USDC", "BTG/BTC", "BTG/ETH", "BTG/USD", "DASH/BCH", "DASH/BTC", "DASH/EOS", "DASH/ETH", "DASH/EURS", "DASH/USD", "DOGE/BTC", "DOGE/ETH", "DOGE/USD", "EOS/BCH", "EOS/BTC", "EOS/ETH", "EOS/EURS", "EOS/PAX", "EOS/USD", "ETC/BCH", "ETC/BTC", "ETC/ETH", "ETC/USD", "ETH/BTC", "ETH/EURS", "ETH/PAX", "ETH/USD", "ETH/USDC", "EURS/USD", "HT/BTC", "HT/USD", "IOTA/BTC", "IOTA/ETH", "IOTA/USD", "LEO/USD", "LINK/BCH", "LINK/BTC", "LINK/ETH", "LINK/USD", "LTC/BCH", "LTC/BTC", "LTC/EOS", "LTC/ETH", "LTC/EURS", "LTC/USD", "NEO/BTC", "NEO/EOS", "NEO/ETH", "NEO/EURS", "NEO/USD", "OMG/BCH", "OMG/BTC", "OMG/ETH", "OMG/USD", "QTUM/BTC", "QTUM/ETH", "QTUM/USD", "TRX/BCH", "TRX/BTC", "TRX/EOS", "TRX/ETH", "TRX/USD", "USD/PAX", "USDT/USD", "USD/USDC", "XEM/BTC", "XEM/ETH", "XLM/BCH", "XLM/BTC", "XLM/ETH", "XLM/USD", "XMR/BCH", "XMR/BTC", "XMR/EOS", "XMR/ETH", "XMR/EURS", "XMR/USD", "XRP/BCH", "XRP/BTC", "XRP/EOS", "XRP/ETH", "XRP/EURS", "XRP/USDT", "XTZ/BTC", "XTZ/ETH", "XTZ/USD", "ZEC/BCH", "ZEC/BTC", "ZEC/EOS", "ZEC/ETH", "ZEC/EURS", "ZEC/USD"}
}

type messageReceived struct {
    Jsonrpc string
    Method  string
    Params  struct {
        Bid       []interface{}
        Ask       []interface{}
        Data      []interface{}
        Sequence  int64
        Symbol    string
        Timestamp string
    }
}

func main() {

    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, os.Interrupt)

    u := url.URL{Scheme: "wss", Host: "api.hitbtc.com", Path: "api/2/ws"}
    fmt.Println("connecting to", u.String())

    conn, _, _, err := ws.DefaultDialer.Dial(context.Background(), u.String())
    if err != nil {
        log.Fatal(err)
    }

    defer conn.Close()

    var bJson []byte

    for _, marketname := range hitbtc_marketname() {

        m := map[string]interface{}{
            "method": "subscribeOrderbook",
            "params": map[string]string{
                "symbol": strings.ReplaceAll(marketname, "/", ""),
            },
            "id": 123,
        }

        bJson, err = json.Marshal(m)
        if err != nil {
            log.Fatal(err)
        }
        err = wsutil.WriteClientMessage(conn, ws.OpText, bJson)
        if err != nil {
            log.Fatal(err)
        }

        m["method"] = "subscribeTrades"
        m["id"] = 124

        bJson, err = json.Marshal(m)
        if err != nil {
            log.Fatal(err)
        }
        err = wsutil.WriteClientMessage(conn, ws.OpText, bJson)
        if err != nil {
            log.Fatal(err)
        }
    }

    go func() {

        for {

            var t time.Time
            var data messageReceived

            msg, _, err := wsutil.ReadServerData(conn)
            if err != nil {
                log.Fatal(err)
            }

            json.Unmarshal(msg, &data)

            if len(data.Params.Timestamp) > 0 {
                t, err = time.Parse("2006-01-02T15:04:05.000Z", data.Params.Timestamp)
                if err != nil {
                    log.Fatal(err)
                }
                fmt.Println(data.Params.Symbol, ";", data.Params.Sequence, ";", data.Params.Timestamp, ";", time.Now().Sub(t).Seconds()*1000, "; gobwas ;", len(msg))
            }
        }
    }()

    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-interrupt:
            log.Println("interrupt")
            select {
            case <-time.After(time.Second):
            }
            return
        }
    }

}

golang-gorilla 代码是

package main

import (
    "flag"
    "fmt"
    "github.com/gorilla/websocket"
    "log"
    "os"
    "os/signal"
    "strings"
    "time"
)

func hitbtc_marketname() []string {
    return []string{"ADA/BCH", "ADA/BTC", "ADA/ETH", "ADA/USD", "BCH/EURS", "BNB/BTC", "BNB/ETH", "BNB/USD", "BSV/BTC", "BSV/USD", "BTC/EURS", "BTC/PAX", "BTC/USD", "BTC/USDC", "BTG/BTC", "BTG/ETH", "BTG/USD", "DASH/BCH", "DASH/BTC", "DASH/EOS", "DASH/ETH", "DASH/EURS", "DASH/USD", "DOGE/BTC", "DOGE/ETH", "DOGE/USD", "EOS/BCH", "EOS/BTC", "EOS/ETH", "EOS/EURS", "EOS/PAX", "EOS/USD", "ETC/BCH", "ETC/BTC", "ETC/ETH", "ETC/USD", "ETH/BTC", "ETH/EURS", "ETH/PAX", "ETH/USD", "ETH/USDC", "EURS/USD", "HT/BTC", "HT/USD", "IOTA/BTC", "IOTA/ETH", "IOTA/USD", "LEO/USD", "LINK/BCH", "LINK/BTC", "LINK/ETH", "LINK/USD", "LTC/BCH", "LTC/BTC", "LTC/EOS", "LTC/ETH", "LTC/EURS", "LTC/USD", "NEO/BTC", "NEO/EOS", "NEO/ETH", "NEO/EURS", "NEO/USD", "OMG/BCH", "OMG/BTC", "OMG/ETH", "OMG/USD", "QTUM/BTC", "QTUM/ETH", "QTUM/USD", "TRX/BCH", "TRX/BTC", "TRX/EOS", "TRX/ETH", "TRX/USD", "USD/PAX", "USDT/USD", "USD/USDC", "XEM/BTC", "XEM/ETH", "XLM/BCH", "XLM/BTC", "XLM/ETH", "XLM/USD", "XMR/BCH", "XMR/BTC", "XMR/EOS", "XMR/ETH", "XMR/EURS", "XMR/USD", "XRP/BCH", "XRP/BTC", "XRP/EOS", "XRP/ETH", "XRP/EURS", "XRP/USDT", "XTZ/BTC", "XTZ/ETH", "XTZ/USD", "ZEC/BCH", "ZEC/BTC", "ZEC/EOS", "ZEC/ETH", "ZEC/EURS", "ZEC/USD"}
}

type messageReceived struct {
    Jsonrpc string
    Method  string
    Params  struct {
        Bid       []interface{}
        Ask       []interface{}
        Data      []interface{}
        Sequence  int64
        Symbol    string
        Timestamp string
    }
}

func main() {
    flag.Parse()
    log.SetFlags(0)

    interrupt := make(chan os.Signal, 1)
    signal.Notify(interrupt, os.Interrupt)

    c, _, err := websocket.DefaultDialer.Dial("wss://api.hitbtc.com/api/2/ws", nil)
    if err != nil {
        log.Fatal("dial:", err)
    }
    defer c.Close()

    done := make(chan struct{})

    for _, channel := range hitbtc_marketname() {

        m := map[string]interface{}{
            "method": "subscribeOrderbook",
            "params": map[string]string{
                "symbol": strings.ReplaceAll(channel, "/", ""),
            },
            "id": 123,
        }
        err = c.WriteJSON(m)
        if err != nil {
            fmt.Println(err)
            os.Exit(1)
        }

        m["method"] = "subscribeTrades"
        m["id"] = 124
        err = c.WriteJSON(m)
        if err != nil {
            fmt.Println(err)
            os.Exit(1)
        }

    }

    go func() {
        defer close(done)
        for {

            var t time.Time
            var data messageReceived

            err := c.ReadJSON(&data)
            if err != nil {
                log.Println("error:", err)
                os.Exit(0)
            }

            if len(data.Params.Timestamp) > 0 {
                t, err = time.Parse("2006-01-02T15:04:05.000Z", data.Params.Timestamp)
                if err != nil {
                    log.Fatal(err)
                }
                fmt.Println(data.Params.Symbol, ";", data.Params.Sequence, ";", data.Params.Timestamp, ";", time.Now().Sub(t).Seconds()*1000, "; gorilla")
            }

        }
    }()

    ticker := time.NewTicker(time.Second)
    defer ticker.Stop()

    for {
        select {
        case <-done:
            return
        case <-interrupt:
            log.Println("interrupt")
            select {
            case <-done:
            case <-time.After(time.Second):
            }
            return
        }
    }
}

方法:我将每个片段的字符串输出到终端。我同时运行这三种解决方案,并逐条比较延迟时间。一切都在运行最新版本 debian 的良好 linux 服务器上运行。

node.js 真的比 golang,还是我错过了什么?

标签: node.jsgowebsocket

解决方案


您的代码在同一个 goroutine 中处理所有消息。

这基本上会阻止新消息接收,直到处理先前的消息。

试试这个(大猩猩)看看是否有帮助:


    go func() {

        for {
            msg, _, err := wsutil.ReadServerData(conn)
            if err != nil {
                log.Fatal(err)
            }
            // dedicated goroutine for message processing, unblocking current one
            go func() {
                var t time.Time
                var data messageReceived

                json.Unmarshal(msg, &data)

                if len(data.Params.Timestamp) > 0 {
                    t, err = time.Parse("2006-01-02T15:04:05.000Z", data.Params.Timestamp)
                    if err != nil {
                        log.Fatal(err)
                    }
                    fmt.Println(data.Params.Symbol, ";", data.Params.Sequence, ";", data.Params.Timestamp, ";", time.Now().Sub(t).Seconds()*1000, "; gobwas ;", len(msg))
                }
            }()

        }
    }()

推荐阅读