菜鸟笔记
提升您的技术认知

操作系统笔记

进程,线程,协程与并行,并发进程线程协程的区别死锁进程,线程,多线程i 的线程安全性同步和异步孤儿进程和僵尸进程/proc进程信息linux中的分段和分页互斥量 mutex线程进程间通信进程创建进程优先级进程的基础知识进程与线程的区别(面试题)线程的控制(创建,终止,等待,分离)可重入 vs 线程安全死锁的概念一级缓存和二级缓存的理解一句话解说内存屏障 memory barrierbrk(), sbrk() 用法详解malloc/free函数的简单实现一文讲透 “进程、线程、协程”linux进程状态线程池的陷阱linux内核学习之进程和线程进程与线程的区别和联系内存寻址linux io子系统和文件系统读写流程page cache和buffer cache的区别与联系漫谈linux文件io多线程和多进程的区别内存泄漏字节、字、位、比特的概念和关系如何避免死锁ansi是什么编码?cpu寻址范围(寻址空间)cpu 使用率低高负载的原因创建多少个线程合适操作系统下spinlock锁解析、模拟及损耗分析线程堆栈堆和栈的内存分配堆和栈的概念和区别堆和栈的区别,申请方式,程序的内存分配什么是 pod 数据类型linux内存分配小结--malloc、brk、mmap系统调用与内存管理(sbrk、brk、mmap、munmap)进程描述和控制cpu执行程序的原理编译的基本概念linux虚拟地址空间布局一个程序从源代码到可执行程序的过程程序的运行机制——cpu、内存、指令的那些事分页内存管理——虚拟地址到物理地址的转换深刻理解linux进程间通信fork之后父子进程的内存关系fork之后,子进程继承了父进程哪些内容关于协程及其锁的一些认识对协程的一点理解std::thread join和detach区别cas和aba问题cas算法锁和无锁无锁队列的实现lock-free 编程锁开销优化以及cas进程、线程和协程之间的区别和联系多线程的同步与互斥(互斥锁、条件变量、读写锁、自旋锁、信号量)linux 原来是这么管理内存的线程上下文切换怎么玩儿进程和线程通信原理cpu密集型 和 io密集型cas原理以及atomic原子类分析改变线程状态的八个方法六种进程间通信方式

锁开销优化以及cas-ag真人游戏

阅读 : 1158

互斥锁是用来保护一个临界区,即保护一个访问共用资源的程序片段,而这些共用资源又无法同时被多个线程访问的特性。当有线程进入临界区段时,其他线程或是进程必须等待。

在谈及锁的性能开销,一般都会说锁的开销很大,那锁的开销有多大,主要耗在哪,怎么提高锁的性能。

锁的开销

现在锁的机制一般使用 futex(fast userspace mutexes),内核态和用户态的混合机制。还没有futex的时候,内核是如何维护同步与互斥的呢?系统内核维护一个对象,这个对象对所有进程可见,这个对象是用来管理互斥锁并且通知阻塞的进程。如果进程a要进入临界区,先去内核查看这个对象,有没有别的进程在占用这个临界区,出临界区的时候,也去内核查看这个对象,有没有别的进程在等待进入临界区,然后根据一定的策略唤醒等待的进程。这些不必要的系统调用(或者说内核陷入)造成了大量的性能开销。为了解决这个问题,futex就应运而生。

futex是一种用户态和内核态混合的同步机制。首先,同步的进程间通过mmap共享一段内存,futex变量就位于这段共享的内存中且操作是原子的,当进程尝试进入互斥区或者退出互斥区的时候,先去查看共享内存中的futex变量,如果没有竞争发生,则只修改futex,而不用再执行系统调用了。当通过访问futex变量告诉进程有竞争发生,则还是得执行系统调用去完成相应的处理(wait 或者 wake up)。简单的说,futex就是通过在用户态的检查,(motivation)如果了解到没有竞争就不用陷入内核了,大大提高了low-contention时候的效率。

mutex 是在 futex 的基础上用的内存共享变量来实现的,如果共享变量建立在进程内,它就是一个线程锁,如果它建立在进程间共享内存上,那么它是一个进程锁。pthread_mutex_t 中的 _lock 字段用于标记占用情况,先使用cas判断_lock是否占用,若未占用,直接返回。否则,通过__lll_lock_wait_private 调用sys_futex 系统调用迫使线程进入沉睡。 cas是用户态的 cpu 指令,若无竞争,简单修改锁状态即返回,非常高效,只有发现竞争,才通过系统调用陷入内核态。所以,futex是一种用户态和内核态混合的同步机制,它保证了低竞争情况下的锁获取效率。

所以如果锁不存在冲突,每次获得锁和释放锁的处理器开销仅仅是cas指令的开销。

确定一件事情最好的方法是实际测试和观测它,让我们写一段代码来测试无冲突时锁的开销:

#include 
#include 
#include 
#include 
static inline long long unsigned time_ns(struct timespec* const ts) {
  if (clock_gettime(clock_realtime, ts)) {
    exit(1);
  }
  return ((long long unsigned) ts->tv_sec) * 1000000000llu
      (long long unsigned) ts->tv_nsec;
}
int main()
{
    int res = -1;
    pthread_mutex_t mutex;
    //初始化互斥量,使用默认的互斥量属性
    res = pthread_mutex_init(&mutex, null);
    if(res != 0)
    {
        perror("pthread_mutex_init failed\n");
        exit(exit_failure);
    }
    long max = 1000000000;
    long c = 0;
    struct timespec ts;
    const long long unsigned start_ns = time_ns(&ts);
    while(c < max) 
    {
        pthread_mutex_lock(&mutex);
        c = c   1;
        pthread_mutex_unlock(&mutex);
    }
    const long long unsigned delta = time_ns(&ts) - start_ns;
    printf("%f\n", delta/(double)max);
    return 0;
}

说明:以下性能测试在腾讯云 intel(r) xeon(r) cpu e5-26xx v4 1核 2399.996mhz 下进行。

运行了 10 亿次,平摊到每次加锁/解锁操作大概是 2.2ns 每次加锁/解锁(扣除了循环耗时 2.7ns)

在锁冲突的情况下,开销就没有这么小了。

首先pthread_mutex_lock会真正的调用sys_futex来进入内核来试图加锁,被锁住以后线程会进入睡眠,这带来了上下文切换和线程调度的开销。

可以写两个互相解锁的线程来测试这个过程的开销:

// ag真人试玩娱乐 copyright (c) 2010 benoit sigoure
//
// this program is free software: you can redistribute it and/or modify
// it under the terms of the gnu general public license as published by
// the free software foundation, either version 3 of the license, or
// (at your option) any later version.
//
// this program is distributed in the hope that it will be useful,
// but without any warranty; without even the implied warranty of
// merchantability or fitness for a particular purpose. see the
// gnu general public license for more details.
//
// you should have received a copy of the gnu general public license
// along with this program. if not, see .
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
static inline long long unsigned time_ns(struct timespec* const ts) {
  if (clock_gettime(clock_realtime, ts)) {
    exit(1);
  }
  return ((long long unsigned) ts->tv_sec) * 1000000000llu
      (long long unsigned) ts->tv_nsec;
}
static const int iterations = 500000;
static void* thread(void* restrict ftx) {
  int* futex = (int*) ftx;
  for (int i = 0; i < iterations; i  ) {
    sched_yield();
    while (syscall(sys_futex, futex, futex_wait, 0xa, null, null, 42)) {
      // retry
      sched_yield();
    }
    *futex = 0xb;
    while (!syscall(sys_futex, futex, futex_wake, 1, null, null, 42)) {
      // retry
      sched_yield();
    }
  }
  return null;
}
int main(void) {
  struct timespec ts;
  const int shm_id = shmget(ipc_private, sizeof (int), ipc_creat | 0666);
  int* futex = shmat(shm_id, null, 0);
  pthread_t thd;
  if (pthread_create(&thd, null, thread, futex)) {
    return 1;
  }
  *futex = 0xa;
  const long long unsigned start_ns = time_ns(&ts);
  for (int i = 0; i < iterations; i  ) {
    *futex = 0xa;
    while (!syscall(sys_futex, futex, futex_wake, 1, null, null, 42)) {
      // retry
      sched_yield();
    }
    sched_yield();
    while (syscall(sys_futex, futex, futex_wait, 0xb, null, null, 42)) {
      // retry
      sched_yield();
    }
  }
  const long long unsigned delta = time_ns(&ts) - start_ns;
  const int nswitches = iterations << 2;
  printf("%i thread context switches in %lluns (%.1fns/ctxsw)\n",
         nswitches, delta, (delta / (float) nswitches));
  wait(futex);
  return 0;
}

编译使用 gcc -std=gnu99 -pthread context_switch.c。

运行的结果是 2003.4ns/ctxsw,所以锁冲突的开销大概是不冲突开销的 910 倍了,相差出乎意料的大。

另外一个c程序可以用来测试“纯上下文切换”的开销,线程只是使用sched_yield来放弃处理器,并不进入睡眠。

// ag真人试玩娱乐 copyright (c) 2010 benoit sigoure
//
// this program is free software: you can redistribute it and/or modify
// it under the terms of the gnu general public license as published by
// the free software foundation, either version 3 of the license, or
// (at your option) any later version.
//
// this program is distributed in the hope that it will be useful,
// but without any warranty; without even the implied warranty of
// merchantability or fitness for a particular purpose. see the
// gnu general public license for more details.
//
// you should have received a copy of the gnu general public license
// along with this program. if not, see .
#include 
#include 
#include 
#include 
#include 
#include 
#include 
#include 
static inline long long unsigned time_ns(struct timespec* const ts) {
  if (clock_gettime(clock_realtime, ts)) {
    exit(1);
  }
  return ((long long unsigned) ts->tv_sec) * 1000000000llu
      (long long unsigned) ts->tv_nsec;
}
static const int iterations = 500000;
static void* thread(void*ctx) {
  (void)ctx;
  for (int i = 0; i < iterations; i  )
      sched_yield();
  return null;
}
int main(void) {
  struct sched_param param;
  param.sched_priority = 1;
  if (sched_setscheduler(getpid(), sched_fifo, ¶m))
    fprintf(stderr, "sched_setscheduler(): %s\n", strerror(errno));
  struct timespec ts;
  pthread_t thd;
  if (pthread_create(&thd, null, thread, null)) {
    return 1;
  }
  long long unsigned start_ns = time_ns(&ts);
  for (int i = 0; i < iterations; i  )
      sched_yield();
  long long unsigned delta = time_ns(&ts) - start_ns;
  const int nswitches = iterations << 2;
  printf("%i thread context switches in %lluns (%.1fns/ctxsw)\n",
         nswitches, delta, (delta / (float) nswitches));
  return 0;
}

“纯上下文切换” 消耗了大概381.2ns/ctxsw。

这样我们大致可以把锁冲突的开销分成三部分,“纯上下文切换”开销,大概是 381.2ns,调度器开销(把线程从睡眠变成就绪或者反过来)大概是1622.2ns,在多核系统上,还存在跨处理器调度的开销,那部分开销很大。在真实的应用场景里,还要考虑上下文切换带来的cache不命中和tlb不命中的开销,开销只会进一步加大。

锁的优化

从上面可以知道,真正消耗时间的不是上锁的次数,而是锁冲突的次数。减少锁冲突的次数才是提升性能的关键。使用更细粒度的锁,可以减少锁冲突。这里说的粒度包括时间和空间,比如哈希表包含一系列哈希桶,为每个桶设置一把锁,空间粒度就会小很多--哈希值相互不冲突的访问不会导致锁冲突,这比为整个哈希表维护一把锁的冲突机率低很多。减少时间粒度也很容易理解,加锁的范围只包含必要的代码段,尽量缩短获得锁到释放锁之间的时间,最重要的是,绝对不要在锁中进行任何可能会阻塞的操作。使用读写锁也是一个很好的减少冲突的方式,读操作之间不互斥,大大减少了冲突。

假设单向链表中的插入/删除操作很少,主要操作是搜索,那么基于单一锁的方法性能会很差。在这种情况下,应该考虑使用读写锁,即 pthread_rwlock_t,这么做就允许多个线程同时搜索链表。插入和删除操作仍然会锁住整个链表。假设执行的插入和搜索操作数量差不多相同,但是删除操作很少,那么在插入期间锁住整个链表是不合适的,在这种情况下,最好允许在链表中的分离点(disjoint point)上执行并发插入,同样使用基于读写锁的方式。在两个级别上执行锁定,链表有一个读写锁,各个节点包含一个互斥锁,在插入期间,写线程在链表上建立读锁,然后继续处理。在插入数据之前,锁住要在其后添加新数据的节点,插入之后释放此节点,然后释放读写锁。删除操作在链表上建立写锁。不需要获得与节点相关的锁;互斥锁只建立在某一个操作节点之上,大大减少锁冲突的次数。

锁本身的行为也存在进一步优化的可能性,sys_futex系统调用的作用在于让被锁住的当前线程睡眠,让出处理器供其它线程使用,既然这个过程的消耗很高,也就是说如果被锁定的时间不超过这个数值的话,根本没有必要进内核加锁——释放的处理器时间还不够消耗的。sys_futex的时间消耗够跑很多次 cas 的,也就是说,对于一个锁冲突比较频繁而且平均锁定时间比较短的系统,一个值得考虑的优化方式是先循环调用 cas 来尝试获得锁(这个操作也被称作自旋锁),在若干次失败后再进入内核真正加锁。当然这个优化只能在多处理器的系统里起作用(得有另一个处理器来解锁,否则自旋锁无意义)。在glibc的pthread实现里,通过对pthread_mutex设置pthread_mutex_adaptive_np属性就可以使用这个机制。

cas

锁产生的一些问题:

  • 等待互斥锁会消耗宝贵的时间,锁的开销很大。
  • 低优先级的线程可以获得互斥锁,因此阻碍需要同一互斥锁的高优先级线程。这个问题称为优先级倒置(priority inversion )
  • 可能因为分配的时间片结束,持有互斥锁的线程被取消调度。这对于等待同一互斥锁的其他线程有不利影响,因为等待时间现在会更长。这个问题称为锁护送(lock convoying)

无锁编程的好处之一是一个线程被挂起,不会影响到另一个线程的执行,避免锁护送;在锁冲突频繁且平均锁定时间较短的系统,避免上下文切换和调度开销。

cas (comapre and swap 或者 check and set),比较并替换,引用 wiki,它是一种用于线程数据同步的原子指令。

cas 核心算法涉及到三个参数,即内存值,更新值和期望值;cas 指令会先检查一个内存位置是否包含预期的值;如果是这样,就把新的值复制到这个位置,返回 true;如果不是则返回 false。
cas 对应一条汇编指令 cmpxchg,因此是原子性的。

bool compare_and_swap (int *accum, int *dest, int newval)
{
  if ( *accum == *dest ) {
      *dest = newval;
      return true;
  }
  return false;
}

一般,程序会在循环里使用 cas 不断去完成一个事务性的操作,一般包含拷贝一个共享的变量到一个局部变量,然后再使用这个局部变量执行任务计算得到新的值,最后再使用 cas 比较保存再局部变量的旧值和内存值来尝试提交你的修改,如果尝试失败,会重新读取一遍内存值,再重新计算,最后再使用 cas 尝试提交修改,如此循环。比如:

void lockfreequeue::push(node* newhead)
{
    for (;;)
    {
        // 拷贝共享变量(m_head) 到一个局部变量
        node* oldhead = m_head;
        // 执行任务,可以不用关注其他线程
        newhead->next = oldhead;
        // 下一步尝试提交更改到共享变量
        // 如果共享变量没有被其他线程修改过,仍为 oldhead,则 cas 将 newhead 赋值给共享变量 m_head 并返回
        // 否则继续循环重试
        if (_interlockedcompareexchange(&m_head, newhead, oldhead))
            return;
    }
}

上面的数据结构设置了一个共享的头节点 m_head,当 push 一个新的节点时,会把新节点加在头节点后面;不要相信程序的执行是连续的,cpu 的执行是多线程并发。在 _interlockedcompareexchange 即 cas 之前,线程可能因为时间片用完被调度出去,新调度进来的线程执行完了 push 操作,多个线程共享了 m_head 变量,此时 m_head 已被修改了,如果原来线程继续执行,把 oldhead 覆盖到 m_head,就会丢失其他线程 push 进来的节点。所以需要比较 m_head 是不是还等于 oldhead,如果是,说明头节点不变,可以使用 newhead 覆盖 m_head;如果不是,说明有其他线程 push 了新的节点,那么需要使用最新的 m_head 更新 oldhead 的值重新走一下循环,_interlockedcompareexchange 会自动把 m_head 赋值给 oldhead。

aba 问题

因为 cas 需要在提交修改时检查期望值和内存值有没有发生变化,如果没有则进行更新,但是如果原来一个值从 a 变成 b 又变成 a,那么使用 cas 检查的时候发现值没有发生变化,但实际上已经发生了一系列变化。

内存的回收利用会导致 cas 出现严重的问题:

t* ptr1 = new t(8, 18);
t* old = ptr1; 
delete ptr1;
t* ptr2 = new t(0, 1);
 
// 我们不能保证操作系统不会重新使用 ptr1 内存地址,一般的内存管理器都会这样子做
if (old1 == ptr2) {
    // 这里表示,刚刚回收的 ptr1 指向的内存被用于后面申请的 ptr2了
}

aba问题是无锁结构实现中常见的一种问题,可基本表述为:

  • 进程p1读取了一个数值a
  • p1被挂起(时间片耗尽、中断等),进程p2开始执行
  • p2修改数值a为数值b,然后又修改回a
  • p1被唤醒,比较后发现数值a没有变化,程序继续执行。

对于p1来说,数值a未发生过改变,但实际上a已经被变化过了,继续使用可能会出现问题。在cas操作中,由于比较的多是指针,这个问题将会变得更加严重。试想如下情况:

有一个堆(先入后出)中有top和节点a,节点a目前位于堆顶top指针指向a。现在有一个进程p1想要pop一个节点,因此按照如下无锁操作进行

pop()
{
  do{
    ptr = top; // ptr = top = nodea
    next_prt = top->next; // next_ptr = nodex
  } while(cas(top, ptr, next_ptr) != true);
  return ptr;   
}

而进程p2在执行cas操作之前打断了p1,并对堆进行了一系列的pop和push操作,使堆变为如下结构:

进程p2首先pop出nodea,之后又push了两个nodeb和c,由于内存管理机制中广泛使用的内存重用机制,导致nodec的地址与之前的nodea一致。

这时p1又开始继续运行,在执行cas操作时,由于top依旧指向的是nodea的地址(实际上已经变为nodec),因此将top的值修改为了nodex,这时堆结构如下:

经过cas操作后,top指针错误的指向了nodex而不是nodeb。

aba 解决方法

tagged state reference,增加额外的 tag bits 位,它像一个版本号;比如,其中一种算法是在内存地址的低位记录指针的修改次数,在指针修改时,下一次 cas 会返回失败,即使因为内存重用机制导致地址一样。有时我们称这种机制位 aba‘,因为我们使第二个 a 稍微有点不同于第一个。tag 的位数长度会影响记录修改的次数,在现有的 cpu 下,使用 60 bit tag,在不重启程序10年才会产生溢出问题;在 x64 cpu,趋向于支持 128 bit 的 cas 指令,这样更能保证避免出现 aba 问题。

下面参考 liblfds 库代码说明下 tagged state reference 的实现过程。

我们想要避免 aba 问题的方法之一是使用更长的指针,这样便需要一个支持 dword 长度的 cas 指令。liblfds 是怎么跨平台实现 128 bit 指令的呢?

在 liblfds 下,cas 指令为 lfds710_pal_atomic_dwcas 宏,它的完整形式是:

lfds710_pal_atomic_dwcas( pointer_to_destination, pointer_to_compare, pointer_to_new_destination, cas_strength, result)

从上面可以看出,liblfds 库使用一个由两个元素组成的一维数组来表示 128 bit 指针。

linux 提供了 cmpxchg16b 用于实现 128 bit 的 cas 指令,而在 windows,使用 _interlockedcompareexchange128。只有 128 位指针完全相等的情况下,才视为相等。

参考 liblfds/liblfds7.1.0/liblfds710/inc/liblfds710/lfds710_porting_abstraction_layer_compiler.h 下关于 cas 的 windows 实现:

#define lfds710_pal_atomic_dwcas( pointer_to_destination, pointer_to_compare, pointer_to_new_destination, cas_strength, result ) \
{ \                                              
    lfds710_pal_barrier_compiler_full; \
    (result) = (char unsigned) _interlockedcompareexchange128( (__int64 volatile *) (pointer_to_destination), (__int64) (pointer_to_new_destination[1]), (__int64) (pointer_to_new_destination[0]), (__int64 *) (pointer_to_compare) ); \
    lfds710_pal_barrier_compiler_full; \
}

再重点研究 new_top 的定义和提交修改过程。

new_top 是一个具有两个元素的一维数组,元素是 struct lfds710_stack_element 指针,两个元素分别使用 pointer 0 和 counter 1 标记。counter 相当于前面说的 tag 标记,pointer 保存的时真正的节点指针。在 x64 下,指针长度是 64 bit,所以这里使用的是 64 bit tag 记录 pointer 修改记录。

liblfds 用原 top 的 counter 1来初始化 new top counter,即使用 counter 标记 ss->top 的更换次数,这样每一次更换 top,top 里的 counter 都会变。

只有在 ss->top 和 original_top 的 pointer 和 counter 完全相等的情况下,new_top 才会覆盖到 ss->top,否则会使用 ss->top 覆盖 original_top,下次循环用最新的 original_top 再次操作和比较。

参考 liblfds/liblfds7.1.0/liblfds710/src/lfds710_stack/lfds710_stack_push.c,无锁堆栈的实现:

void lfds710_stack_push( struct lfds710_stack_state *ss,
                         struct lfds710_stack_element *se )
{
  char unsigned
    result;
  lfds710_pal_uint_t
    backoff_iteration = lfds710_backoff_initial_value;
  struct lfds710_stack_element lfds710_pal_align(lfds710_pal_align_double_pointer)
    *new_top[pac_size],
    *volatile original_top[pac_size];
  lfds710_pal_assert( ss != null );
  lfds710_pal_assert( se != null );
  new_top[pointer] = se;
  original_top[counter] = ss->top[counter];
  original_top[pointer] = ss->top[pointer];
  do
  {
    se->next = original_top[pointer];
    lfds710_misc_barrier_store;
    new_top[counter] = original_top[counter]   1;
    lfds710_pal_atomic_dwcas( ss->top, original_top, new_top, lfds710_misc_cas_strength_weak, result );
    if( result == 0 )
      lfds710_backoff_exponential_backoff( ss->push_backoff, backoff_iteration );
  }
  while( result == 0 );
  lfds710_backoff_autotune( ss->push_backoff, backoff_iteration );
  return;
}
  1. 无锁数据结构,参考 https://github.com/liblfds/li...
  2. 高性能内存队列disruptor中的cas,参考 http://ifeve.com/disruptor/
  3. 数据库乐观锁

参考

[wiki compare-and-swap]
[wiki aba problem]
[左耳朵耗子无锁队列的实现]
[ibm 设计不使用互斥锁的并发数据结构]
[aba problem]
[_interlockedcompareexchange128]
[linux 互斥锁的实现原理(pthread_mutex_t)]
[futex机制介绍]
[an-introduction-to-lock-free-programming]
[多进程、多线程与多处理器计算平台的性能问题]
[implement lock-free queue]
[上下文切换和线程调度性能测试]
[纯上下文切换性能测试]
[锁的开销]
[pthread包的mutex实现分析]
[ibm通用线程:posix 线程详解]

网站地图