首页 > 解决方案 > 选择性地向特定客户端发送事件

问题描述

我正在构建一个 gin 应用程序,在该应用程序中,我利用 SSE 根据本教程通知我的用户对数据库的任何相关更改。目前,我已经实现了这样的功能,即在 POST 处理程序期间通过通道向 GET 处理程序发送消息以触发所有客户端的事件发送。终点是/events/:uid/uid 指代参数变量的地方,该变量将是我设置的客户端 UID。我的问题是,我对如何将 SSE 广播给特定的用户组感到困惑,他们每个人都在自己的唯一端点上收听。现在所有客户端都添加到地图中,但是我不确定如何根据它进行过滤。

GET 端点处理程序

var eventsStream = broker.NewServer()

func GetRelevantMarkers(c *gin.Context) {
    c.Writer.Header().Set("Content-Type", "text/event-stream")
    c.Writer.Header().Set("Cache-Control", "no-cache")
    c.Writer.Header().Set("Connection", "keep-alive")
    c.Writer.Header().Set("Transfer-Encoding", "chunked")

    clientChan := make(broker.ClientChan)

    // Send new connection to event server
    eventsStream.NewClients <- clientChan

    defer func() {
        // Send closed connection to event server
        eventsStream.ClosedClients <- clientChan
    }()

    go func() {
        // Send connection that is closed by client to event server
        <-c.Done()
        eventsStream.ClosedClients <- clientChan
    }()


    c.Stream(func(w io.Writer) bool {
        // Stream message to client from message channel
        if msg, ok := <-eventsStream.Message; ok {
            c.SSEvent("message", msg)
            return true
        }
        return false
    })
}

事件代理

type Event struct {
    // Events are pushed to this channel by the main events-gathering routine
    Message chan string

    // New client connections
    NewClients chan chan string

    // Closed client connections
    ClosedClients chan chan string

    // Total client connections
    TotalClients map[chan string]bool
}

type ClientChan chan string

func NewServer() (event *Event) {

    event = &Event{
        Message:       make(chan string),
        NewClients:    make(chan chan string),
        ClosedClients: make(chan chan string),
        TotalClients:  make(map[chan string]bool),
    }

    go event.listen()

    return
}

func (stream *Event) listen() {
    for {
        select {
        // Add new available client
        case client := <-stream.NewClients:
            stream.TotalClients[client] = true
            log.Printf("Client added. %d registered clients", len(stream.TotalClients))

        // Remove closed client
        case client := <-stream.ClosedClients:
            delete(stream.TotalClients, client)
            log.Printf("Removed client. %d registered clients", len(stream.TotalClients))

        // Broadcast message to client
        case eventMsg := <-stream.Message:
            for clientMessageChan := range stream.TotalClients {
                clientMessageChan <- eventMsg
            }
        }
    }
}

POST 端点处理程序

func DeleteEvent(c *gin.Context) {
    eventsStream.Message <- "event-deleted"
}

标签: goclientserver-sent-eventsgo-gin

解决方案


推荐阅读