C++11 无锁队列实现与解析说明

因为在多线程服务器中使用了一个消息队列,该消息队列使用了两把锁控制读写,因此效率损失较大,故探索多生产者多消费者的无锁队列以提升性能 本文引用了CSDN xin_hen的观点和代码并做出了一定调整:https://blog.csdn.net/xin_hen/article/details/108145222

前言

什么是无锁队列

无锁队列一般指的是通过CAS操作来保证队列的线程安全性问题,而不会使得线程陷入到内核,以避免用户态与内核态的切换开销

无锁队列无需用户在外围控制锁来保证队列的线程安全问题,减少因为锁带来的性能开销。

什么是CAS

CAS(Compare and swap, 对比后交换),这是一种所有CPU都支持的原子操作,由于其原子性,可以被用来实现各类无锁数据结构。 其在C++11中的atomic中被支持,因此可以实现跨平台的开发。

1
2
3
4
template< typename T >
class atomic{
bool compare_exchange_weak(T* expected, T desired);
}

compare_exchange_weak的作用是如果当前变量的值==expected的值,则将当前变量的值改为desired,返回true,否则则返回false。

什么是自旋锁

自旋锁是指当一个线程在获取锁的时候,如果锁已经被其它线程获取,那么该线程将循环等待,然后不断的判断锁是否能够被成功获取,直到获取到锁才会退出循环。

实现

实现思路

在了解基础概念后,就可以尝试这实现无锁队列了: 对于任意一个线程,在头或尾节点不为空后使用compare_exchange_weak将该节点置空,便可以抢占该内存地址(对于其他线程来说,该节点为空,因此无法抢占而自旋),因此可以利用CAS实现另类的加锁。

代码说明

完整代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
/**
 * @file JgLockFreeQueue.hpp
 * @author Wangzhengqiao (me@zhengqiao.wang)
 * @brief 无锁队列
 * @version 0.1
 * @date 2021-09-24
 * 
 * 该无锁队列线程安全,有ABA风险。
 * 讲解时:
 * 介绍什么是无锁队列
 * 什么是CAS
 * 什么是自旋锁
 * 无锁队列实质是硬件锁实现的队列
 * 代码解释
 */

#ifndef JGLOCK_FREE_QUEUE_HPP
#define JGLOCK_FREE_QUEUE_HPP

#include <atomic>
#include <assert.h>
#include <thread>

template <typename T>
class JgLockFreeQueue
{
public:
    JgLockFreeQueue()
    {
        Node *node = new Node(Empty);
        m_head.store(node);
        m_tail.store(node); // 初始化时,head=tail=空
        m_isLockFree = node->val.is_lock_free();
    }
    ~JgLockFreeQueue()
    {
        T val = Empty;
        while (tryPop(val))
        {
        }
        Node *node = m_head.load();
        if (node != nullptr)
            delete node;
    }

public:
    bool isLockFree()
    {
        return m_isLockFree.load();
    }

    int getCount()
    {
        return m_count.load();
    }

    /**
     * @brief 队列添加数据
     * 对tail成功CAS为nullptr 表示当前线程获取tail自旋锁成功,并设置tail的next节点为push的元素,解锁tail,即将tail进行CAS为tail->next;
     * 
     * @param val 
     * @return true 
     * @return false 
     */
    bool tryPush(T val)
    {
        Node *t = nullptr;
        Node *node = new Node(val);

        while (1)
        {
            // t==NULL,表示tail锁被抢
            if (nullptr == t)
            {
                t = m_tail.load();
                continue;
            }
            //尝试加tail锁
            if (!m_tail.compare_exchange_weak(t, nullptr))
            {
                continue;
            }
            break;
        }

        // 到此处表明已抢到
        t->next.store(node);
        ++m_count;
        Node *expected = nullptr;
        // 释放tail锁
        bool flag = m_tail.compare_exchange_weak(expected, t->next);
        assert(flag);
        return flag;
    }

    /**
     * @brief 
     * 对head成功CAS为NULL 表示当前线程获取head自旋锁成功,并需要判断当前数组是否为空,如果为空,则解锁并返回为false;否则成功,则pop出数据head->next->val,最后解锁,即将head进行CAS为head->next;
     * 
     * @param val 
     * @return true 
     * @return false 
     */
    bool tryPop(T &val)
    {
        Node *h = nullptr;
        Node *h_next = nullptr;

        while (1)
        {
            //h==NULL,表示head锁被抢
            if (nullptr == h)
            {
                h = m_head.load();
                continue;
            }
            //尝试加head锁
            if (!m_head.compare_exchange_weak(h, nullptr))
            {
                continue;
            }
            h_next = h->next.load();
            // h->next != NULL 且 count == 0
            // 此时在push函数中数据以及count计数器没有来得及更新,因此进行自旋
            if (nullptr != h_next)
            {
                while (m_count.load() == 0)
                {
                    std::this_thread::yield(); // 让渡时间片
                }
            }
            break;
        }
        Node *expected = nullptr;
        Node *desired = h;
        // 当h_next==NULL时
        // 表示当前链表为空
        if (nullptr != h_next)
        {
            val = h_next->val;
            delete h;
            desired = h_next;
            --m_count;
        }
        //CAS head,释放head锁
        bool flag = m_head.compare_exchange_weak(expected, desired);
        assert(flag);
        return (h_next != nullptr);
    }

private:
    struct Node
    {
        std::atomic<T> val;                ///< 保存的值
        std::atomic<Node *> next{nullptr}; ///< 链表向下指针
        Node(T val) : val(val) {}          ///< 初始化
    };
    const T Empty = 0;

    std::atomic<int> m_count{0};   ///< 队列计数器
    std::atomic<Node *> m_head;    ///< 头节点
    std::atomic<Node *> m_tail;    ///< 尾节点
    std::atomic_bool m_isLockFree; ///< lockfree标志位
};

#endif
重点说明
Push操作

声明两个临时节点tnode,其中t是用来保存队尾指针的临时指针,node是保存新值的节点。

1
2
        Node *t = nullptr;
        Node *node = new Node(val);

对队尾指针使用CAS设置为nullptr,形成自旋锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
        while (1)
        {
            // t==NULL,表示tail锁被抢
            if (nullptr == t)
            {
                t = m_tail.load();
                continue;
            }
            //尝试加tail锁
            if (!m_tail.compare_exchange_weak(t, nullptr))
            {
                continue;
            }
            break;
        }

如果获取自旋锁成功,则设置尾指针的next节点为输入的元素,

1
2
3
        // 到此处表明已抢到
        t->next.store(node);
        ++m_count;

然后解锁尾指针,将tail进行CAS设置为tail->next。

1
2
3
        Node *expected = nullptr;
        // 释放tail锁
        bool flag = m_tail.compare_exchange_weak(expected, t->next);
Pop操作

声明两个临时节点hh_next,其中h是用于保存队首的临时变量,h_next则是用于保存队首下一个元素的临时变量。

1
2
        Node *h = nullptr;
        Node *h_next = nullptr;

对队首使用CAS设置为nullptr,形成自旋锁,需要注意的是,当队列有数据(h->next != nullptr)且count==0时,则说明push还没完,因此还继续自旋等待(该等待是占用着资源,只是让开时间片)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
        while (1)
        {
            //h==NULL,表示head锁被抢
            if (nullptr == h)
            {
                h = m_head.load();
                continue;
            }
            //尝试加head锁
            if (!m_head.compare_exchange_weak(h, nullptr))
            {
                continue;
            }
            h_next = h->next.load();
            // h->next != NULL 且 count == 0
            // 此时在push函数中数据以及count计数器没有来得及更新,因此进行自旋
            if (nullptr != h_next)
            {
                while (m_count.load() == 0)
                {
                    std::this_thread::yield(); // 让渡时间片
                }
            }
            break;
        }

如果获取自旋锁成功,则需要判断当前队列是否是空的,如果不为空,则将队首通过CAS设置为h_next,并返回True,否则返回False

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
        Node *expected = nullptr;
        Node *desired = h;
        // 当h_next==NULL时
        // 表示当前链表为空
        if (nullptr != h_next)
        {
            val = h_next->val;
            delete h;
            desired = h_next;
            --m_count;
        }
        //CAS head,释放head锁
        bool flag = m_head.compare_exchange_weak(expected, desired);
        assert(flag);
        return (h_next != nullptr);

验证

验证代码可见https://gitee.com/wangjinchaoXY/Realization/blob/master/Realization/LockFreeLinkedQueueTest.cpp 请自行编译下载。 也可以看 https://gitee.com/JogerQiao/lock-free-queue