1 前言
最近发现之前写的redis
客户端有一个bug,代码示例如下:
Result result;
std::string next_id{"$"};
while(true)
{
redis->xread(topic_name, next_id, std::chrono::miliseconds(1000), 1, std::inserter(result, result.end));
if(result.empty()) continue;
// 以下为解析
// ...
next_id = result.id; // 示例,表示从result里的取的消息id
}
代码的含义是一开始期望获取队列中的最新的元素(使用$
表示next_id),然后如果接收到新的消息时更新next_id
,然后在下次循环的时候获取指定next_id
后的消息。
看起来没有什么问题,但当这个主题刚刚建立,且有多个消费者时,就会出现第一次生产只有一个消费者消费,第二次生产只有两个消费者消费,第三次生产只有三个消费者消费…以此类推。
为了解决这个bug
,不得不开始重新梳理Stream
的原理,并确定它是如何实现广播的。
在Redis Stream_jedis stream的block参数-CSDN博客一文中提到了Stream 允许多个客户端监听,默认情况下新添加的元素,会被每一个客户端消费
,再结合文中的示例:
# 从ID为0开始读取 mystream 流中的数据
XREAD COUNT 2 STREAMS mystream 0 (nil)
大致清楚问题原因所在。
2 原理与原因分析
我们需要先了解一下xread
的原理。
在XREAD | Docs (redis.io)文档中提到,xread
可以阻塞或非阻塞地从流中读取特定id
之后的消息。
这个id
有几种:
0
:从队列开头开始读取消息$
:从队列结尾开始读取消息(也就是取最新的消息),历史的记录都不关心。官方要求仅用于第一次xread
的时候使用,否则可能会无法收到后续消息。+
:从队尾读一个消息(Redis 7.4 RC1+支持){next_id}
:从指定消息id的下一个消息开始读取
在我前面的示例里,需要注意:
我使用的是
xread
的非阻塞实现。
这一点很关键,直接导致了现象的异常。
我认为每个消费者xread
的时候要从队尾取消息,于是我是用了$
(使用的当前版本不支持+
),假设此时有3个消费者c1,c2,c3,1个生产者p,队列里此时没有消息。那么场景是:
- 3个消费者,1个生产者,队列是空的。三个消费者都在等待
msg_id
为$
的下一个消息。 - 生产者生产了一个消息
msg1
c1
根据$
最后一个消息为msg1
,于是消费。- 消费完成后
c1
将自己的next_id
设置为msg1_id
,等待msg1
的下一个消息。
- 消费完成后
c2
和c3
可能恰好错过了xread
的block
,重新进行了xread
,根据$
确定还没有最后一个未被消费的消息,于是继续等待。(相当于把msg1
当成历史消息忽略了)
- 生产者生产了下一个消息
msg2
c1
根据msg1_id
确定下一个消息是msg2
,于是消费。- 消费完成后
c1
将自己的next_id
设置为msg2_id
,等待msg2
的下一个消息。
- 消费完成后
c2
恰好这次还在xread
的block
期间,于是它根据$
最后一个消息为msg2
,于是消费.- 消费完成后
c2
将自己的next_id
设置为msg2_id
,等待msg2
的下一个消息。 也就是说,只有当c1
、c2
、c3
三者同时在xread
的block
阶段,否则可能无法同时收到消息,做不到广播的效果。
- 消费完成后
3 解决方法
既然确定了问题,那么就可以修改next_id
的获取机制:
- 启动时判断队列里的最新的
msg_id
,把它作为next_id
。 - 如果队列里没有数据,则将
next_id
设置为0
,这样要求从头获取。 - 每次消费完新消息后,将
next_id
设置为消费的消息的id
。 最终的代码效果是(下面以Python
为例,主要涉及xreverange
和xread
用法,其他语言自行转换即可):
def getMaxMessageId()->str:
try:
ret = redis_client.xrevrange(topic_name, "+", "-", 1)
if len(ret) > 0:
return ret[0][0] # 返回最新的msg_id
except Exception as e:
pass
return "0" # 返回队列头
next_id=getMaxMessageId()
while True:
try:
ret = redis_client.xread({topic_name: next_id}, count=1, block=1000)
if len(ret) == 0:
continue
except Exception as e:
continue
message = ret[0][1][0]
next_id = message[0]
print(dict(message[1])) # 输出消息体