因为在多线程服务器中使用了一个消息队列,该消息队列使用了两把锁控制读写,因此效率损失较大,故探索多生产者多消费者的无锁队列以提升性能 本文引用了CSDN xin_hen的观点和代码并做出了一定调整:https://blog.csdn.net/xin_hen/article/details/108145222

1 前言

1.1 什么是无锁队列

无锁队列一般指的是通过CAS操作来保证队列的线程安全性问题,而不会使得线程陷入到内核,以避免用户态与内核态的切换开销

无锁队列无需用户在外围控制锁来保证队列的线程安全问题,减少因为锁带来的性能开销。

1.2 什么是CAS

CAS(Compare and swap, 对比后交换),这是一种所有CPU都支持的原子操作,由于其原子性,可以被用来实现各类无锁数据结构。

其在C++11中的atomic中被支持,因此可以实现跨平台的开发。

template< typename T >
class atomic{
bool compare_exchange_weak(T* expected, T desired);
}

compare_exchange_weak的作用是如果当前变量的值==expected的值,则将当前变量的值改为desired,返回true,否则则返回false。

1.3 什么是自旋锁

自旋锁是指当一个线程在获取锁的时候,如果锁已经被其它线程获取,那么该线程将循环等待,然后不断的判断锁是否能够被成功获取,直到获取到锁才会退出循环。

2 实现

2.1 实现思路

在了解基础概念后,就可以尝试这实现无锁队列了:

对于任意一个线程,在头或尾节点不为空后使用compare_exchange_weak将该节点置空,便可以抢占该内存地址(对于其他线程来说,该节点为空,因此无法抢占而自旋),因此可以利用CAS实现另类的加锁。

2.1.1 代码说明

2.1.1.1 完整代码

/**
 * @file JgLockFreeQueue.hpp
 * @author Wangzhengqiao (me@zhengqiao.wang)
 * @brief 无锁队列
 * @version 0.1
 * @date 2021-09-24
 * 
 * 该无锁队列线程安全,有ABA风险。
 * 讲解时:
 * 介绍什么是无锁队列
 * 什么是CAS
 * 什么是自旋锁
 * 无锁队列实质是硬件锁实现的队列
 * 代码解释
 */
 
#ifndef JGLOCK_FREE_QUEUE_HPP
#define JGLOCK_FREE_QUEUE_HPP
 
#include <atomic>
#include <assert.h>
#include <thread>
 
template <typename T>
class JgLockFreeQueue
{
public:
    JgLockFreeQueue()
    {
        Node *node = new Node(Empty);
        m_head.store(node);
        m_tail.store(node); // 初始化时,head=tail=空
        m_isLockFree = node->val.is_lock_free();
    }
    ~JgLockFreeQueue()
    {
        T val = Empty;
        while (tryPop(val))
        {
        }
        Node *node = m_head.load();
        if (node != nullptr)
            delete node;
    }
 
public:
    bool isLockFree()
    {
        return m_isLockFree.load();
    }
 
    int getCount()
    {
        return m_count.load();
    }
 
    /**
     * @brief 队列添加数据
     * 对tail成功CAS为nullptr 表示当前线程获取tail自旋锁成功,并设置tail的next节点为push的元素,解锁tail,即将tail进行CAS为tail->next;
     * 
     * @param val 
     * @return true 
     * @return false 
     */
    bool tryPush(T val)
    {
        Node *t = nullptr;
        Node *node = new Node(val);
 
        while (1)
        {
            // t==NULL,表示tail锁被抢
            if (nullptr == t)
            {
                t = m_tail.load();
                continue;
            }
            //尝试加tail锁
            if (!m_tail.compare_exchange_weak(t, nullptr))
            {
                continue;
            }
            break;
        }
 
        // 到此处表明已抢到
        t->next.store(node);
        ++m_count;
        Node *expected = nullptr;
        // 释放tail锁
        bool flag = m_tail.compare_exchange_weak(expected, t->next);
        assert(flag);
        return flag;
    }
 
    /**
     * @brief 
     * 对head成功CAS为NULL 表示当前线程获取head自旋锁成功,并需要判断当前数组是否为空,如果为空,则解锁并返回为false;否则成功,则pop出数据head->next->val,最后解锁,即将head进行CAS为head->next;
     * 
     * @param val 
     * @return true 
     * @return false 
     */
    bool tryPop(T &val)
    {
        Node *h = nullptr;
        Node *h_next = nullptr;
 
        while (1)
        {
            //h==NULL,表示head锁被抢
            if (nullptr == h)
            {
                h = m_head.load();
                continue;
            }
            //尝试加head锁
            if (!m_head.compare_exchange_weak(h, nullptr))
            {
                continue;
            }
            h_next = h->next.load();
            // h->next != NULL 且 count == 0
            // 此时在push函数中数据以及count计数器没有来得及更新,因此进行自旋
            if (nullptr != h_next)
            {
                while (m_count.load() == 0)
                {
                    std::this_thread::yield(); // 让渡时间片
                }
            }
            break;
        }
        Node *expected = nullptr;
        Node *desired = h;
        // 当h_next==NULL时
        // 表示当前链表为空
        if (nullptr != h_next)
        {
            val = h_next->val;
            delete h;
            desired = h_next;
            --m_count;
        }
        //CAS head,释放head锁
        bool flag = m_head.compare_exchange_weak(expected, desired);
        assert(flag);
        return (h_next != nullptr);
    }
 
private:
    struct Node
    {
        std::atomic<T> val;                ///< 保存的值
        std::atomic<Node *> next{nullptr}; ///< 链表向下指针
        Node(T val) : val(val) {}          ///< 初始化
    };
    const T Empty = 0;
 
    std::atomic<int> m_count{0};   ///< 队列计数器
    std::atomic<Node *> m_head;    ///< 头节点
    std::atomic<Node *> m_tail;    ///< 尾节点
    std::atomic_bool m_isLockFree; ///< lockfree标志位
};
 
#endif

2.1.1.2 重点说明

2.1.1.2.1 Push操作

声明两个临时节点tnode,其中t是用来保存队尾指针的临时指针,node是保存新值的节点。

        Node *t = nullptr;
        Node *node = new Node(val);

对队尾指针使用CAS设置为nullptr,形成自旋锁。

        while (1)
        {
            // t==NULL,表示tail锁被抢
            if (nullptr == t)
            {
                t = m_tail.load();
                continue;
            }
            //尝试加tail锁
            if (!m_tail.compare_exchange_weak(t, nullptr))
            {
                continue;
            }
            break;
        }

如果获取自旋锁成功,则设置尾指针的next节点为输入的元素,

        // 到此处表明已抢到
        t->next.store(node);
        ++m_count;

然后解锁尾指针,将tail进行CAS设置为tailnext。

        Node *expected = nullptr;
        // 释放tail锁
        bool flag = m_tail.compare_exchange_weak(expected, t->next);
2.1.1.2.2 Pop操作

声明两个临时节点hh_next,其中h是用于保存队首的临时变量,h_next则是用于保存队首下一个元素的临时变量。

        Node *h = nullptr;
        Node *h_next = nullptr;

对队首使用CAS设置为nullptr,形成自旋锁,需要注意的是,当队列有数据(hnext != nullptr)且count==0时,则说明push还没完,因此还继续自旋等待(该等待是占用着资源,只是让开时间片)。

        while (1)
        {
            //h==NULL,表示head锁被抢
            if (nullptr == h)
            {
                h = m_head.load();
                continue;
            }
            //尝试加head锁
            if (!m_head.compare_exchange_weak(h, nullptr))
            {
                continue;
            }
            h_next = h->next.load();
            // h->next != NULL 且 count == 0
            // 此时在push函数中数据以及count计数器没有来得及更新,因此进行自旋
            if (nullptr != h_next)
            {
                while (m_count.load() == 0)
                {
                    std::this_thread::yield(); // 让渡时间片
                }
            }
            break;
        }

如果获取自旋锁成功,则需要判断当前队列是否是空的,如果不为空,则将队首通过CAS设置为h_next,并返回True,否则返回False

        Node *expected = nullptr;
        Node *desired = h;
        // 当h_next==NULL时
        // 表示当前链表为空
        if (nullptr != h_next)
        {
            val = h_next->val;
            delete h;
            desired = h_next;
            --m_count;
        }
        //CAS head,释放head锁
        bool flag = m_head.compare_exchange_weak(expected, desired);
        assert(flag);
        return (h_next != nullptr);

3 验证

验证代码可见https://gitee.com/wangjinchaoXY/Realization/blob/master/Realization/LockFreeLinkedQueueTest.cpp 请自行编译下载。

也可以看

https://gitee.com/JogerQiao/lock-free-queue