首页 > 解决方案 > Golang 中的 TCP 连接池

问题描述

所以我的后端架构的流程是 Browser -> HTTP Server -> TCP Server -> HTTP Server -> Browser。为了减少延迟并增加请求/秒或 QPS,我实现了连接池。我只使用标准库,如果我的实现:

连接.go

type ConnectionPool struct {
    mutex sync.RWMutex
    connections chan net.Conn // Buffered channel with maximum size by user input
    maxSize uint64
}

type Counter struct {
    numConnections int
    mutex sync.RWMutex
}

var TotalConnections uint64

func CreateConnection() (net.Conn, error){
    //log.Println("Creating a new connection to TCP Server from HTTP Server.")

    atomic.AddUint64(&TotalConnections, 1)

    log.Println("New connection created!: ", TotalConnections)
    return net.Dial("tcp", "localhost:8081")
}

func CreateConnectionPool(initialSize int, maximumSize uint64) (*ConnectionPool, error) {
    log.Println("Connection pool is being created!")
    pool := &ConnectionPool{
        connections: make(chan net.Conn, maximumSize),
        maxSize: maximumSize,
    }
    TotalConnections = 0
    // Creating the number of initial connections
    for iterator := 0; iterator < initialSize; iterator++ {
        atomic.AddUint64(&TotalConnections, 1)
        log.Println("New connection created!: ", TotalConnections)
        singleConnection, er := net.Dial("tcp", "localhost:8081")

        if er != nil {
            log.Fatal("error in creating initial connections: ", er.Error())
        }
        pool.connections <- singleConnection // WOW
    }
    return pool, nil
}

func (pool *ConnectionPool) Length() int {
    allConnections := pool.GetPool()
    return len(allConnections)
}

func (pool *ConnectionPool) ClosePool() {
    pool.mutex.Lock()
    //allConnections := pool.connections
    pool.connections = nil
    pool.mutex.Unlock()

    TotalConnections = 0
    /*
    if allConnections != nil {
        for connections := range allConnections {
            connections.Close()
        }
    }
     */
}

func (pool *ConnectionPool) GetPool() chan net.Conn {
    // RLock(): multiple go routine can read(not write) at a time by acquiring the lock.
    return pool.connections
}

func (pool *ConnectionPool) GetOneConnection() (net.Conn, error) {
    //log.Println("GetOneConnection called.")

    allConnections := pool.GetPool() // 0 value of a channel is nil

    if allConnections == nil {
        return nil, errors.New("channel is empty")
    }

    // Select waits on multiple channel operations


    if atomic.LoadUint64(&TotalConnections) >= pool.maxSize { // Cannot create new connections
        //log.Println("Reached max number of active connections!")
        singleConnection := <- allConnections
        return singleConnection, nil
    } else{ // Still have not opened maxSize number of connections
        select {
        case singleConnection := <- allConnections:
            //log.Println("Connection from the channel is given")
            if singleConnection == nil {
                return nil, errors.New("returned a nil connection.")
            }
            return singleConnection, nil
        default:
            atomic.AddUint64(&TotalConnections, 1)
            log.Println("New connection created!: ", TotalConnections)
            return net.Dial("tcp", "localhost:8081")
        }
    }
}

func (pool *ConnectionPool) PutOneConnection(singleConnection net.Conn) error {
    // If pool is full, Close the connection and carry on.
    // Else, just add it to the pool.
    //log.Println("PutOneConnection called.")
    if singleConnection == nil {
        log.Println("Connection was already Closed, did not add to the pool.")
        return nil
    }


    if pool.connections == nil {
        singleConnection.Close()
        return errors.New("Pool was already closed.")
    }


    select {
    case pool.connections <- singleConnection:
        //log.Println("Connection returned to the pool, length: ", pool.Length())
    default:
        log.Println("POOL FULL! CON CLOSED!")
        singleConnection.Close()
    }

    return nil
}

HTTP 服务器 (main.go)

...
pool, er = connectionpool.CreateConnectionPool(5, 10)
conn, err := pool.GetOneConnection()
errr := pool.PutOneConnection(o.connection)
...

但是,它的性能比正常情况差。我试图找出导致性能下降的原因。已尝试使用 pprof 查找瓶颈,但仍然无法弄清楚。

正常/conn 池化版本的 QPS:每秒 5000,每秒 2000 ish

提前致谢!

编辑 1:我正在使用可以与 TCP 服务器建立的最大连接数。对于同步,我使用了原子计数器。对于 net.Conns 的同步 Gets(),我使用了通道。

编辑 2:我没有从 TCP 端关闭单个连接,简单的 Read() 和 Write() 操作是通过连接执行的。

标签: goserverconnection-pooling

解决方案


推荐阅读