因为在多线程服务器中使用了一个消息队列,该消息队列使用了两把锁控制读写,因此效率损失较大,故探索多生产者多消费者的无锁队列以提升性能 本文引用了CSDN xin_hen的观点和代码并做出了一定调整:https://blog.csdn.net/xin_hen/article/details/108145222
1 前言
1.1 什么是无锁队列
无锁队列一般指的是通过CAS操作来保证队列的线程安全性问题,而不会使得线程陷入到内核,以避免用户态与内核态的切换开销
无锁队列无需用户在外围控制锁来保证队列的线程安全问题,减少因为锁带来的性能开销。
1.2 什么是CAS
CAS(Compare and swap, 对比后交换),这是一种所有CPU都支持的原子操作,由于其原子性,可以被用来实现各类无锁数据结构。
其在C++11中的atomic中被支持,因此可以实现跨平台的开发。
template< typename T >
class atomic{
bool compare_exchange_weak(T* expected, T desired);
}
compare_exchange_weak
的作用是如果当前变量的值==expected的值,则将当前变量的值改为desired,返回true,否则则返回false。
1.3 什么是自旋锁
自旋锁是指当一个线程在获取锁的时候,如果锁已经被其它线程获取,那么该线程将循环等待,然后不断的判断锁是否能够被成功获取,直到获取到锁才会退出循环。
2 实现
2.1 实现思路
在了解基础概念后,就可以尝试这实现无锁队列了:
对于任意一个线程,在头或尾节点不为空后使用compare_exchange_weak将该节点置空,便可以抢占该内存地址(对于其他线程来说,该节点为空,因此无法抢占而自旋),因此可以利用CAS实现另类的加锁。
2.1.1 代码说明
2.1.1.1 完整代码
/**
* @file JgLockFreeQueue.hpp
* @author Wangzhengqiao (me@zhengqiao.wang)
* @brief 无锁队列
* @version 0.1
* @date 2021-09-24
*
* 该无锁队列线程安全,有ABA风险。
* 讲解时:
* 介绍什么是无锁队列
* 什么是CAS
* 什么是自旋锁
* 无锁队列实质是硬件锁实现的队列
* 代码解释
*/
#ifndef JGLOCK_FREE_QUEUE_HPP
#define JGLOCK_FREE_QUEUE_HPP
#include <atomic>
#include <assert.h>
#include <thread>
template <typename T>
class JgLockFreeQueue
{
public:
JgLockFreeQueue()
{
Node *node = new Node(Empty);
m_head.store(node);
m_tail.store(node); // 初始化时,head=tail=空
m_isLockFree = node->val.is_lock_free();
}
~JgLockFreeQueue()
{
T val = Empty;
while (tryPop(val))
{
}
Node *node = m_head.load();
if (node != nullptr)
delete node;
}
public:
bool isLockFree()
{
return m_isLockFree.load();
}
int getCount()
{
return m_count.load();
}
/**
* @brief 队列添加数据
* 对tail成功CAS为nullptr 表示当前线程获取tail自旋锁成功,并设置tail的next节点为push的元素,解锁tail,即将tail进行CAS为tail->next;
*
* @param val
* @return true
* @return false
*/
bool tryPush(T val)
{
Node *t = nullptr;
Node *node = new Node(val);
while (1)
{
// t==NULL,表示tail锁被抢
if (nullptr == t)
{
t = m_tail.load();
continue;
}
//尝试加tail锁
if (!m_tail.compare_exchange_weak(t, nullptr))
{
continue;
}
break;
}
// 到此处表明已抢到
t->next.store(node);
++m_count;
Node *expected = nullptr;
// 释放tail锁
bool flag = m_tail.compare_exchange_weak(expected, t->next);
assert(flag);
return flag;
}
/**
* @brief
* 对head成功CAS为NULL 表示当前线程获取head自旋锁成功,并需要判断当前数组是否为空,如果为空,则解锁并返回为false;否则成功,则pop出数据head->next->val,最后解锁,即将head进行CAS为head->next;
*
* @param val
* @return true
* @return false
*/
bool tryPop(T &val)
{
Node *h = nullptr;
Node *h_next = nullptr;
while (1)
{
//h==NULL,表示head锁被抢
if (nullptr == h)
{
h = m_head.load();
continue;
}
//尝试加head锁
if (!m_head.compare_exchange_weak(h, nullptr))
{
continue;
}
h_next = h->next.load();
// h->next != NULL 且 count == 0
// 此时在push函数中数据以及count计数器没有来得及更新,因此进行自旋
if (nullptr != h_next)
{
while (m_count.load() == 0)
{
std::this_thread::yield(); // 让渡时间片
}
}
break;
}
Node *expected = nullptr;
Node *desired = h;
// 当h_next==NULL时
// 表示当前链表为空
if (nullptr != h_next)
{
val = h_next->val;
delete h;
desired = h_next;
--m_count;
}
//CAS head,释放head锁
bool flag = m_head.compare_exchange_weak(expected, desired);
assert(flag);
return (h_next != nullptr);
}
private:
struct Node
{
std::atomic<T> val; ///< 保存的值
std::atomic<Node *> next{nullptr}; ///< 链表向下指针
Node(T val) : val(val) {} ///< 初始化
};
const T Empty = 0;
std::atomic<int> m_count{0}; ///< 队列计数器
std::atomic<Node *> m_head; ///< 头节点
std::atomic<Node *> m_tail; ///< 尾节点
std::atomic_bool m_isLockFree; ///< lockfree标志位
};
#endif
2.1.1.2 重点说明
2.1.1.2.1 Push操作
声明两个临时节点t
和node
,其中t
是用来保存队尾指针的临时指针,node
是保存新值的节点。
Node *t = nullptr;
Node *node = new Node(val);
对队尾指针使用CAS设置为nullptr,形成自旋锁。
while (1)
{
// t==NULL,表示tail锁被抢
if (nullptr == t)
{
t = m_tail.load();
continue;
}
//尝试加tail锁
if (!m_tail.compare_exchange_weak(t, nullptr))
{
continue;
}
break;
}
如果获取自旋锁成功,则设置尾指针的next节点为输入的元素,
// 到此处表明已抢到
t->next.store(node);
++m_count;
然后解锁尾指针,将tail进行CAS设置为tail→next。
Node *expected = nullptr;
// 释放tail锁
bool flag = m_tail.compare_exchange_weak(expected, t->next);
2.1.1.2.2 Pop操作
声明两个临时节点h
和h_next
,其中h
是用于保存队首的临时变量,h_next
则是用于保存队首下一个元素的临时变量。
Node *h = nullptr;
Node *h_next = nullptr;
对队首使用CAS设置为nullptr,形成自旋锁,需要注意的是,当队列有数据(h→next != nullptr)且count==0时,则说明push还没完,因此还继续自旋等待(该等待是占用着资源,只是让开时间片)。
while (1)
{
//h==NULL,表示head锁被抢
if (nullptr == h)
{
h = m_head.load();
continue;
}
//尝试加head锁
if (!m_head.compare_exchange_weak(h, nullptr))
{
continue;
}
h_next = h->next.load();
// h->next != NULL 且 count == 0
// 此时在push函数中数据以及count计数器没有来得及更新,因此进行自旋
if (nullptr != h_next)
{
while (m_count.load() == 0)
{
std::this_thread::yield(); // 让渡时间片
}
}
break;
}
如果获取自旋锁成功,则需要判断当前队列是否是空的,如果不为空,则将队首通过CAS设置为h_next,并返回True,否则返回False
Node *expected = nullptr;
Node *desired = h;
// 当h_next==NULL时
// 表示当前链表为空
if (nullptr != h_next)
{
val = h_next->val;
delete h;
desired = h_next;
--m_count;
}
//CAS head,释放head锁
bool flag = m_head.compare_exchange_weak(expected, desired);
assert(flag);
return (h_next != nullptr);
3 验证
验证代码可见https://gitee.com/wangjinchaoXY/Realization/blob/master/Realization/LockFreeLinkedQueueTest.cpp 请自行编译下载。
也可以看