1 前言

最近发现之前写的redis客户端有一个bug,代码示例如下:

Result result;
std::string next_id{"$"};
while(true)
{
	redis->xread(topic_name, next_id, std::chrono::miliseconds(1000), 1, std::inserter(result, result.end));
 
	if(result.empty()) continue;
 
	// 以下为解析
	// ...
	next_id = result.id; // 示例,表示从result里的取的消息id
}

代码的含义是一开始期望获取队列中的最新的元素(使用$表示next_id),然后如果接收到新的消息时更新next_id,然后在下次循环的时候获取指定next_id后的消息。

看起来没有什么问题,但当这个主题刚刚建立,且有多个消费者时,就会出现第一次生产只有一个消费者消费,第二次生产只有两个消费者消费,第三次生产只有三个消费者消费…以此类推。

为了解决这个bug,不得不开始重新梳理Stream的原理,并确定它是如何实现广播的。

Redis Stream_jedis stream的block参数-CSDN博客一文中提到了Stream 允许多个客户端监听,默认情况下新添加的元素,会被每一个客户端消费,再结合文中的示例:

# 从ID为0开始读取 mystream 流中的数据
XREAD COUNT 2 STREAMS mystream 0 (nil)

大致清楚问题原因所在。

2 原理与原因分析

我们需要先了解一下xread的原理。

XREAD | Docs (redis.io)文档中提到,xread可以阻塞或非阻塞地从流中读取特定id之后的消息。

这个id有几种:

  • 0:从队列开头开始读取消息
  • $:从队列结尾开始读取消息(也就是取最新的消息),历史的记录都不关心。官方要求仅用于第一次xread的时候使用,否则可能会无法收到后续消息。
  • +:从队尾读一个消息(Redis 7.4 RC1+支持)
  • {next_id}:从指定消息id的下一个消息开始读取

在我前面的示例里,需要注意:

我使用的是xread的非阻塞实现。

这一点很关键,直接导致了现象的异常。

我认为每个消费者xread的时候要从队尾取消息,于是我是用了$(使用的当前版本不支持+),假设此时有3个消费者c1,c2,c3,1个生产者p,队列里此时没有消息。那么场景是:

  1. 3个消费者,1个生产者,队列是空的。三个消费者都在等待msg_id$的下一个消息。
  2. 生产者生产了一个消息msg1
    1. c1根据$最后一个消息为msg1,于是消费。
      1. 消费完成后c1将自己的next_id设置为msg1_id,等待msg1的下一个消息。
    2. c2c3可能恰好错过了xreadblock,重新进行了xread,根据$确定还没有最后一个未被消费的消息,于是继续等待。(相当于把msg1当成历史消息忽略了)
  3. 生产者生产了下一个消息msg2
    1. c1根据msg1_id确定下一个消息是msg2,于是消费。
      1. 消费完成后c1将自己的next_id设置为msg2_id,等待msg2的下一个消息。
    2. c2恰好这次还在xreadblock期间,于是它根据$最后一个消息为msg2,于是消费.
      1. 消费完成后c2将自己的next_id设置为msg2_id,等待msg2的下一个消息。 也就是说,只有当c1c2c3三者同时在xreadblock阶段,否则可能无法同时收到消息,做不到广播的效果。

3 解决方法

既然确定了问题,那么就可以修改next_id的获取机制:

  1. 启动时判断队列里的最新的msg_id,把它作为next_id
  2. 如果队列里没有数据,则将next_id设置为0,这样要求从头获取。
  3. 每次消费完新消息后,将next_id设置为消费的消息的id。 最终的代码效果是(下面以Python为例,主要涉及xreverangexread用法,其他语言自行转换即可):
def getMaxMessageId()->str:
	try:
		ret = redis_client.xrevrange(topic_name, "+", "-", 1)
		if len(ret) > 0:
			return ret[0][0] # 返回最新的msg_id
	except Exception as e:
		pass
	return "0" # 返回队列头
 
next_id=getMaxMessageId()
while True:
	try:
		ret = redis_client.xread({topic_name: next_id}, count=1, block=1000)
		if len(ret) == 0:
			continue
	except Exception as e:
		continue
	message = ret[0][1][0]
	next_id = message[0]
	print(dict(message[1])) # 输出消息体