菜鸟笔记
提升您的技术认知

c   并发无锁队列的原理与实现-ag真人游戏

一般无锁队列的情况分为两种,第一种是单个消费者与单个生产者,第二种是多个消费者或者多个生产着的情况。

一.单个消费者与单个生产者的情况

这种情况下可以用环形队列ringbuffer来实现无锁队列,比如dpdk和kfifo的无锁队列就是用环形队列实现的,kfifo里面的入队和出队的处理很巧妙,大家可以去看看。
dpdk:https://www.coonote.com/cplusplus-note/lockless-circular-queue.html
kfifo:https://www.coonote.com/cplusplus-note/linux-kfifo.html

用数组模拟环形队列举个简单的例子:

#include 
#include 
#include 
#include 
using namespace std;
#define ring_queue_size 10
template 
class ringbuffer {
    public:
        ringbuffer(int size): m_size(size),m_head(0), m_tail(0) {
            m_buf = new t[size];
        }
        ~ringbuffer() {
            delete [] m_buf;
            m_buf = null;
        }
        inline bool isempty() const {
            return m_head == m_tail;
        }
        inline bool isfull() const {
            return m_tail == (m_head   1) % m_size;     //取模是为了考虑队列尾的特殊情况
        }
        bool push(const t& value) {                     //以实例的方式传值
            if(isfull()) {
                return false;
            }
            m_buf[m_head] = value;
            m_head = (m_head   1) % m_size;
            return true;
        }
        bool push(const t* value) {                      //以指针的方式传值
            if(isfull()) {
                return false;
            }
            m_buf[m_head] = *value;
            m_head = (m_head   1) % m_size;
            return true;
        }
        inline bool pop(t& value)
        {
            if(isempty()) {
                return false;
            }
            value = m_buf[m_tail];
            m_tail = (m_tail   1) % m_size;
            return true;
        }
        inline unsigned int head()const {
            return m_head;
        }
        inline unsigned int tail()const {
            return m_tail;
        }
        inline unsigned int size()const {
            return m_size;
        }
    private:
        int m_size;                                    // 队列大小
        int m_head;                                    // 队列头部索引
        int m_tail;                                    // 队列尾部索引
        t* m_buf;                                      // 队列数据缓冲区
};
typedef struct node {                                  //任务节点
    int cmd;
    void *value;                                       //可根据情况改变
}tasknode;
#if 1
void produce(ringbuffer* rqueue) {
    int i = 0;
    for(i=0;ipush(node);
    }
    
}
void consume(ringbuffer* rqueue) {
    while(!rqueue->isempty()) {
       tasknode node;
       rqueue->pop(node); 
    }
}
#endif
int main() 
{
    ringbuffer * rqueue = new ringbuffer(ring_queue_size);
    std::thread producer(produce,rqueue);
    std::thread consumer(consume,rqueue);
    producer.join();
    consumer.join();
    delete rqueue;
    return 0;
}

二.多个生产者或者多个消费者的情况

这种情况一般都是出现在多线程开发的场合,会有多个对象同时操作队列,此时为了避免这种情况,可以有两种方式,第一种就是最常见的加锁,第二种就是无锁队列,用原子操作实现,无锁数据结构依赖很重要的技术就是cas操作——compare & set,或是 compare & swap,基本的思路就是先与内存中的值进行比较,如果相同就进行set或者swap操作。

常见的gcc内置原子操作有下面这些。

  1. type __sync_fetch_and_add (type *ptr, type value, ...)
    // 将value加到*ptr上,结果更新到*ptr,并返回操作之前*ptr的值
  2. type __sync_fetch_and_sub (type *ptr, type value, ...)
    // 从*ptr减去value,结果更新到*ptr,并返回操作之前*ptr的值
  3. type __sync_fetch_and_or (type *ptr, type value, ...)
    // 将*ptr与value相或,结果更新到*ptr, 并返回操作之前*ptr的值
  4. type __sync_fetch_and_and (type *ptr, type value, ...)
    // 将*ptr与value相与,结果更新到*ptr,并返回操作之前*ptr的值
  5. type __sync_fetch_and_xor (type *ptr, type value, ...)
    // 将*ptr与value异或,结果更新到*ptr,并返回操作之前*ptr的值
  6. type __sync_fetch_and_nand (type *ptr, type value, ...)
    // 将*ptr取反后,与value相与,结果更新到*ptr,并返回操作之前*ptr的值
  7. type __sync_add_and_fetch (type *ptr, type value, ...)
    // 将value加到*ptr上,结果更新到*ptr,并返回操作之后新*ptr的值
  8. type __sync_sub_and_fetch (type *ptr, type value, ...)
    // 从*ptr减去value,结果更新到*ptr,并返回操作之后新*ptr的值
  9. type __sync_or_and_fetch (type *ptr, type value, ...)
    // 将*ptr与value相或, 结果更新到*ptr,并返回操作之后新*ptr的值
  10. type __sync_and_and_fetch (type *ptr, type value, ...)
    // 将*ptr与value相与,结果更新到*ptr,并返回操作之后新*ptr的值
  11. type __sync_xor_and_fetch (type *ptr, type value, ...)
    // 将*ptr与value异或,结果更新到*ptr,并返回操作之后新*ptr的值
  12. type __sync_nand_and_fetch (type *ptr, type value, ...)
    // 将*ptr取反后,与value相与,结果更新到*ptr,并返回操作之后新*ptr的值
  13. bool __sync_bool_compare_and_swap (type *ptr, type oldval type newval, ...)
    // 比较*ptr与oldval的值,如果两者相等,则将newval更新到*ptr并返回true
  14. type __sync_val_compare_and_swap (type *ptr, type oldval type newval, ...)
    // 比较*ptr与oldval的值,如果两者相等,则将newval更新到*ptr并返回操作之前*ptr的值
  15. __sync_synchronize (...)
    // 发出完整内存栅栏
  16. type __sync_lock_test_and_set (type *ptr, type value, ...)
    // 将value写入*ptr,对*ptr加锁,并返回操作之前*ptr的值。即,try spinlock语义
  17. void __sync_lock_release (type *ptr, ...)
    // 将0写入到*ptr,并对*ptr解锁。即,unlock spinlock语义

下面就用环形数组来实现无锁队列,举个例子,当然用环形链表也可以,但是链表要考虑到一个aba的问题(这个后面再介绍分析)。

#include 
#include 
#include 
#include 
#include 
using namespace std;
#define ring_queue_size 10
//std::mutex mtx;
template 
class ringbuffer {
    public:
        ringbuffer(int size): m_size(size),m_head(0), m_tail(0) {
            m_buf = new t[size];
        }
        ~ringbuffer() {
            delete [] m_buf;
            m_buf = null;
        }
        inline bool isempty() const {
            return m_head == m_tail;
        }
        inline bool isfull() const {
            return m_tail == (m_head   1) % m_size;     //取模是为了考虑队列尾的特殊情况
        }
        /*使用互斥锁实现并发情况共用队列*/
        # if 0
        bool mut_push(const t& value);
        bool mut_push(const t* value); 
        bool mut_pop(t& value);
        #endif
        /*使用原子操作实现并发情况无锁队列*/
        bool cas_push(const t& value);
        bool cas_push(const t* value); 
        bool cas_pop(t& value);
      
        inline unsigned int head()const {
            return m_head;
        }
        inline unsigned int tail()const {
            return m_tail;
        }
        inline unsigned int size()const {
            return m_size;
        }
    private:
        int m_size;                                    // 队列大小
        int m_head;                                    // 队列头部索引
        int m_tail;                                    // 队列尾部索引
        t* m_buf;                                      // 队列数据缓冲区
};
# if 0  //互斥锁实现
template 
bool ringbuffer::mut_push(const t& value) {
    while (mtx.try_lock()) {
        if(isfull()) {
            return false;
        }
        m_buf[m_head] = value;
        m_head = (m_head   1) % m_size;
        mtx.unlock();
        return true;
    }
}
template 
bool ringbuffer::mut_push(const t* value) {
    while (mtx.try_lock()) {
        if(isfull()) {
            return false;
        }
        m_buf[m_head] = *value;
        m_head = (m_head   1) % m_size;
        mtx.unlock();
        return true;
    }
}
template 
bool ringbuffer::mut_pop(t& value)
{
    while (mtx.try_lock()) {
        if(isempty()) {
             return false;
        }
        value = m_buf[m_tail];
        m_tail = (m_tail   1) % m_size;
        mtx.unlock();
        return true;
    }
}
#endif
template 
bool ringbuffer::cas_push(const t& value) {
    if(isfull()) {
        return false;
    }
    int oldvalue,newvalue;
    do{
        oldvalue = m_head;
        newvalue = (oldvalue   1) % m_size;
    }while(__sync_bool_compare_and_swap(&m_head,oldvalue,newvalue) != true);
    m_buf[oldvalue] = value;
    return true;
}
template 
bool ringbuffer::cas_push(const t* value) {
   if(isfull()) {
        return false;
    }
    int oldvalue,newvalue;
    do{
        oldvalue = m_head;
        newvalue = (oldvalue   1) % m_size;
    }while(__sync_bool_compare_and_swap(&m_head,oldvalue,newvalue) != true);
    m_buf[oldvalue] = *value;
    return true;
}
template 
bool ringbuffer::cas_pop(t& value)
{
    if(isempty()) {
        return false;
    }
    int oldvalue,newvalue;
    do{
        oldvalue = m_tail;
        newvalue = (oldvalue   1) % m_size;
    }while(__sync_bool_compare_and_swap(&m_tail,oldvalue,newvalue) != true);
    value = m_buf[oldvalue];
    return true;
}
typedef struct node {                              //任务节点
    int cmd;
    void *value;
}tasknode;
void produce(ringbuffer* rqueue) {
    int i = 0;
    for(i=0;icas_push(node);
    }
    
}
void consume(ringbuffer* rqueue) {
    while(!rqueue->isempty()) {
       tasknode node;
       rqueue->cas_pop(node); 
    }
}
int main() 
{
    int i = 0;
    ringbuffer * rqueue = new ringbuffer(ring_queue_size);
    std::thread producer[20];
    std::thread consumer[20];
    for(i = 0; i<20; i  ) {
        producer[i] = std::thread(produce,rqueue);
        consumer[i] = std::thread(consume,rqueue);
    }
    for (auto &thread : producer)
        thread.join();
    for (auto &thread : consumer)
        thread.join();
    delete rqueue;
    return 0;
}

这段代码中无锁队列实现的关键就是__sync_bool_compare_and_swap(&m_head,oldvalue,newvalue)函数,它会时刻将m_head和oldvalue比较,如果被其它线程抢先改了head的值就会返回失败。

现在说一下前面提到的aba的问题,基本的情况是下面这样的,多出现在内存复用的时候:

1.假设队列中只有一个节点m,线程a进行出队列操作,取得了当前节点m,节点m的地址是0x123,线程a被线程b抢占打断,还没进行出队列的实际操作

2.线程b同样将节点m出队列了,然后又重新申请了一个节点n,巧的是节点n的地址也是0x123,线程b将n节点入队列

3.线程b又被线程a打断,此时线程a重新开始执行,但是cas操作的接口比较的是地址,线程a发现节点的地址没有改变,又将n节点出队列了。

这种情况就好比电视据中经常出现的剧情,男主和女主的行李箱发生的交互,但是互不知晓。

通常这种情况会用双重cas来进行保证,在加一个计数器,使用节点内存引用计数refcnt,来判断值是否是原来的。  

网站地图