首页 > 解决方案 > 使用 RedisGears 对接收 pubsub 消息进行计算

问题描述

在我的场景中,我将消息发布到 Redis,这些消息包含 GPS 坐标(纬度/经度对)。

例子:

redis-cli -p 16379 PUBLISH gps_positions "{'lat': 50.5243584, 'lon': 12.3616320}"
redis-cli -p 16379 PUBLISH gps_positions "{'lat': 50.5063360, 'lon': 12.3377472}"

客户端订阅这些消息:

redis-cli -p 16379 PSUBSCRIBE gps_positions

现在我想在它们发布到 Redis pubsub 频道的那一刻计算当前以前的 GPS 坐标之间的方位角。计算应该直接在 Redis 中进行,对客户端透明。由于有大量已发布的消息,我想避免在 Redis 中保存任何数据——我只需要每个新发布的坐标的前一个 GPS 坐标。

最后,客户端应该收到包括方位角在内的 GPS 位置,而无需自己进行任何计算:

"{'lat': 50.5243584, 'lon': 12.3616320, 'bearing': $BEARING}"

计算方位的算法对于这个问题无关紧要,但它看起来像这样:

import pyproj

def get_bearing(lat1, lon1, lat2, lon2):
    geodesic = pyproj.Geod(ellps='WGS84')
    fwd_azimuth, back_azimuth, distance = geodesic.inv(lat1, lon1, lat2, lon2)
    return fwd_azimuth, back_azimuth, distance

lat1, lon1 = 50.5243584, 12.3616320
lat2, lon2 = 50.5063360, 12.3377472

bearing = get_bearing(lat1, lon1, lat2, lon2)[0]

RedisGears 是否能够在客户端收到这些消息之前侦听和更改已发布的消息?或者我可以使用 RedisGears 将消息发布到包含计算的方位角的不同通道吗?

标签: redisredisgears

解决方案


我不确定您是否可以收听 PubSub 事件。但是,您可以写入仅由 RedisGears 使用的流,然后发布修改后的事件或使用 Streams,这样您也可以从消费者那里获得对这些消息的确认。

我写了一个具有这种方法的演示: https ://github.com/hnsk/redis-streams-log-demo/blob/master/gears_functions.py

我的测试生成器写入名为“test”的流,Gears 使用 trimStream=True(默认)使用该流,因此消息在使用时从流中修剪。

然后它从事件中获取字段“log_level”并将其用作新流的键,并在其中写入(XADD)相同的数据,然后由实际消费者使用。

在示例中,它还将消息写入 RediSearch 的哈希和排序集以计算每个 log_level 的事件。


推荐阅读