1 前言
最近发现之前写的redis客户端有一个bug,代码示例如下:
Result result;
std::string next_id{"$"};
while(true)
{
redis->xread(topic_name, next_id, std::chrono::miliseconds(1000), 1, std::inserter(result, result.end));
if(result.empty()) continue;
// 以下为解析
// ...
next_id = result.id; // 示例,表示从result里的取的消息id
}代码的含义是一开始期望获取队列中的最新的元素(使用$表示next_id),然后如果接收到新的消息时更新next_id,然后在下次循环的时候获取指定next_id后的消息。
看起来没有什么问题,但当这个主题刚刚建立,且有多个消费者时,就会出现第一次生产只有一个消费者消费,第二次生产只有两个消费者消费,第三次生产只有三个消费者消费…以此类推。
为了解决这个bug,不得不开始重新梳理Stream的原理,并确定它是如何实现广播的。
在Redis Stream_jedis stream的block参数-CSDN博客一文中提到了Stream 允许多个客户端监听,默认情况下新添加的元素,会被每一个客户端消费,再结合文中的示例:
# 从ID为0开始读取 mystream 流中的数据
XREAD COUNT 2 STREAMS mystream 0 (nil)大致清楚问题原因所在。
2 原理与原因分析
我们需要先了解一下xread的原理。
在XREAD | Docs (redis.io)文档中提到,xread可以阻塞或非阻塞地从流中读取特定id之后的消息。
这个id有几种:
0:从队列开头开始读取消息$:从队列结尾开始读取消息(也就是取最新的消息),历史的记录都不关心。官方要求仅用于第一次xread的时候使用,否则可能会无法收到后续消息。+:从队尾读一个消息(Redis 7.4 RC1+支持){next_id}:从指定消息id的下一个消息开始读取
在我前面的示例里,需要注意:
我使用的是
xread的非阻塞实现。
这一点很关键,直接导致了现象的异常。
我认为每个消费者xread的时候要从队尾取消息,于是我是用了$(使用的当前版本不支持+),假设此时有3个消费者c1,c2,c3,1个生产者p,队列里此时没有消息。那么场景是:

- 3个消费者,1个生产者,队列是空的。三个消费者都在等待
msg_id为$的下一个消息。 - 生产者生产了一个消息
msg1c1根据$最后一个消息为msg1,于是消费。- 消费完成后
c1将自己的next_id设置为msg1_id,等待msg1的下一个消息。
- 消费完成后
c2和c3可能恰好错过了xread的block,重新进行了xread,根据$确定还没有最后一个未被消费的消息,于是继续等待。(相当于把msg1当成历史消息忽略了)
- 生产者生产了下一个消息
msg2c1根据msg1_id确定下一个消息是msg2,于是消费。- 消费完成后
c1将自己的next_id设置为msg2_id,等待msg2的下一个消息。
- 消费完成后
c2恰好这次还在xread的block期间,于是它根据$最后一个消息为msg2,于是消费.- 消费完成后
c2将自己的next_id设置为msg2_id,等待msg2的下一个消息。 也就是说,只有当c1、c2、c3三者同时在xread的block阶段,否则可能无法同时收到消息,做不到广播的效果。
- 消费完成后
3 解决方法
既然确定了问题,那么就可以修改next_id的获取机制:
- 启动时判断队列里的最新的
msg_id,把它作为next_id。 - 如果队列里没有数据,则将
next_id设置为0,这样要求从头获取。 - 每次消费完新消息后,将
next_id设置为消费的消息的id。 最终的代码效果是(下面以Python为例,主要涉及xreverange和xread用法,其他语言自行转换即可):
def getMaxMessageId()->str:
try:
ret = redis_client.xrevrange(topic_name, "+", "-", 1)
if len(ret) > 0:
return ret[0][0] # 返回最新的msg_id
except Exception as e:
pass
return "0" # 返回队列头
next_id=getMaxMessageId()
while True:
try:
ret = redis_client.xread({topic_name: next_id}, count=1, block=1000)
if len(ret) == 0:
continue
except Exception as e:
continue
message = ret[0][1][0]
next_id = message[0]
print(dict(message[1])) # 输出消息体