首页 > 解决方案 > 如何在没有订阅者的情况下发布

问题描述

在对 pub/sub 和 xadd/xread 进行了一些测试之后,我发现如果我的订阅者没有打开,那么每当我启动订阅者时都不会收到消息。例如情况

  1. 您通过发布发送消息
  2. 通过发布发送消息后,您打开订阅者并收听频道 10 秒
  3. 消息将丢失。

我尝试过两种不同的代码,例如

子.py

import redis
import time
from config import configuration

client: redis = redis.Redis(
    host=configuration.helheim.redis_host,
    port=configuration.helheim.redis_port,
    db=configuration.helheim.redis_database
)

while True:
    test = client.xread({"sns": '$'}, None, 0)
    print(test)
    time.sleep(1)

发布.py

import redis

from config import configuration

client: redis = redis.Redis(
    host=configuration.helheim.redis_host,
    port=configuration.helheim.redis_port,
    db=configuration.helheim.redis_database
)


test = client.xadd("sns", {"status": "kill", "link": "https://www.sneakersnstuff.com/sv/product/49769/salomon-xa-alpine-mid-advanced"})
print(test)

子.py

EVENT_LISTENER.subscribe("sns")

while True:
    message = EVENT_LISTENER.get_message()

    if message and not message['data'] == 1:
        message = json.loads(message['data'])

发布.py

import redis

from config import configuration

client: redis = redis.Redis(
    host=configuration.helheim.redis_host,
    port=configuration.helheim.redis_port,
    db=configuration.helheim.redis_database
)

channel = "sns"
client.publish(channel,
               '{"status": "kill", "store": "sns", "link": "https://www.sneakersnstuff.com/sv/product/49769/salomon-xa-alpine-mid-advanced"}')

并且似乎没有保存在 redis 中的持久历史消息。

我的问题是,当我打开我的订阅者时,我如何能够阅读我已发布并在阅读后删除的消息?

标签: python-3.xredis

解决方案


Pub/sub 永远不会保留消息。请参阅Redis Pub/Sub 和 Redis Stream 之间的主要区别是什么?

流确实会保留消息,请参阅https://redis.io/commands/xread

问题是您正在使用带有特殊 $ id 的 xread,它只会在您调用后添加消息。

当阻塞时,有时我们只想接收从我们阻塞的那一刻开始通过 XADD 添加到流中的条目。在这种情况下,我们对已添加条目的历史不感兴趣。对于这个用例,我们必须检查流顶部元素 ID,并在 XREAD 命令行中使用该 ID。这不干净,需要调用其他命令,因此可以使用特殊的 $ ID 来指示我们只需要新事物的流。

您可能想在第一次通话时尝试使用 0,然后使用最后一个消息 ID。

如果您想避免在失败的情况下从零开始,并且您无法将最后一条消息 ID 保存在客户端中,请了解https://redis.io/topics/streams-intro#consumer-groups


推荐阅读