本文首先介绍了读写信号量,然后介绍了其API,接着以一个实验的形式,给大家展示了读写信号量内部的count值的含义。只有明白了count的含义,我们在分析问题时才能得心应手。

系统环境

  • 发行版:centos7.5 (Virtual Box虚拟机)
  • 内核版本:3.10.0-862.14.4.el7.x86_64
  • 处理器:Intel(R) Core(TM) i7-4770HQ CPU @ 2.20GHz
  • 内存:4GB

读写信号量介绍

读写信号量对访问者进行了细分,或者为读者,或者为写者,读者在持有读写信号量期间只能对该读写信号量保护的共享资源进行读访问,如果一个任务除了需要读,可能还需要写,那么它必须被归类为写者,它在对共享资源访问之前必须先获得写者身份,写者在发现自己不需要写访问的情况下可以降级为读者。读写信号量同时拥有的读者数不受限制,也就说可以有任意多个读者同时拥有一个读写信号量。

如果一个读写信号量当前没有被写者拥有并且也没有写者等待读者释放信号量,那 么任何读者都可以成功获得该读写信号量;否则,读者必须被挂起直到写者释放该信号量。如果一个读写信号量当前没有被读者或写者拥有并且也没有写者等待该信号量,那么一个写者可以成功获得该读写信号量,否则写者将被挂起,直到没有任何访问者。因此,写者是排他性的,独占性的。

读写信号量有两种实现:

  • 一种是通用的,不依赖于硬件架构,因此,增加新的架构不需要重新实现它,但缺点是性能低,获得和释放读写信号量的开销大;
  • 另一种是架构相关的,因此性能高,获取和释放读写信号量的开销小,但增加新的架构需要重新实现。在内核配置时,可以通过选项去控制使用哪一种实现。

读写信号量的相关API

API
DECLARE_RWSEM(name) 该宏声明一个读写信号量name并对其进行初始化。
void init_rwsem(struct rw_semaphore *sem) 该函数对读写信号量sem进行初始化。
void down_read(struct rw_semaphore *sem) 读者调用该函数来得到读写信号量sem。该函数会导致调用者睡眠,因此只能在进程上下文使用。
int __must_check down_read_killable(struct rw_semaphore *sem);
int down_read_trylock(struct rw_semaphore *sem) 该函数类似于down_read,只是它不会导致调用者睡眠。它尽力得到读写信号量sem,如果能够立即得到,它就得到该读写信号量,并且返回1,否则表示不能立刻得到该信号量,返回0。因此,它也可以在中断上下文使用。
void down_write(struct rw_semaphore *sem) 写者使用该函数来得到读写信号量sem,它也会导致调用者睡眠,因此只能在进程上下文使用。
int __must_check down_write_killable(struct rw_semaphore *sem);
int down_write_trylock(struct rw_semaphore *sem) 该函数类似于down_write,只是它不会导致调用者睡眠。该函数尽力得到读写信号量,如果能够立刻获得,就获得该读写信号量并且返回1,否则表示无法立刻获得,返回0。它可以在中断上下文使用。
void up_read(struct rw_semaphore *sem) 读者使用该函数释放读写信号量sem。它与down_readdown_read_trylock配对使用。如果down_read_trylock返回0,不需要调用up_read来释放读写信号量,因为根本就没有获得信号量。
void up_write(struct rw_semaphore *sem) 写者调用该函数释放信号量sem。它与down_writedown_write_trylock配对使用。如果down_write_trylock返回0,不需要调用up_write,因为返回0表示没有获得该读写信号量。
void downgrade_write(struct rw_semaphore *sem) 该函数用于把写者降级为读者,这有时是必要的。因为写者是排他性的,因此在写者保持读写信号量期间,任何读者或写者都将无法访问该读写信号量保护的共享资源,对于那些当前条件下不需要写访问的写者,降级为读者将,使得等待访问的读者能够立刻访问,从而增加了并发性,提高了效率。

读写信号量适于在读多写少的情况下使用,在linux内核中对进程的内存映像描述结构的访问就使用了读写信号量进行保护。

Linux中,每一个进程都用一个类型为struct task_struct的结构来描述,该结构的类型为struct mm_struct的字段mm描述了进程的内存映像,特别是mm_struct结构的mmap字段维护了整个进程的内存块列表,该列表将在进程生存期间被大量地遍历或修改。

因此mm_struct结构就有一个字段mmap_sem来对mmap的访问 进行保护,mmap_sem就是一个读写信号量,在proc文件系统里有很多进程内存使用情况的接口,通过它们能够查看某一进程的内存使用情况,命令 freepstop都是通过proc来得到内存使用信息的,proc接口就使用down_readup_read来读取进程的mmap信息。

当进程动态地分配或释放内存时,需要修改mmap来反映分配或释放后的内存映像,因此动态内存分配或释放操作需要以写者身份获得读写信号量mmap_sem来对mmap进行更新。系统调用brkmunmap就使用了down_writeup_write来保护对mmap的访问。

kernel中的定义

kernel中,读写信号量的定义如下:

/* All arch specific implementations share the same struct */
struct rw_semaphore {
        RH_KABI_REPLACE(long            count,
                        atomic_long_t   count)
        raw_spinlock_t  wait_lock;
        struct optimistic_spin_queue osq; /* spinner MCS lock */
        struct slist_head       wait_list;
        /*
         * Write owner. Used as a speculative check to see
         * if the owner is running on the cpu.
         */
        struct task_struct      *owner;
};

wait_lock用来保护链表wait_listwait_list是一个链表,存放了所有的等待信号的进程,信号量的状态由count表示。

count值分析

以下通过编写内核模块来演示count的值的变化情况,模块代码请移步这里 ,该代码在3.10.0-862.14.4.el7.x86_64上编译通过。

初始化

读写信号量初始化时,rw_semaphore.count的值为0x0000000000000000

down_readup_read

  • 当有一个reader持有时,其值为0x0000000000000001
  • 当有两个reader持有时,其值为0x0000000000000002;
  • 当有nreader持有时,其值为0x000000000000000n
  • 读写锁同时可以有多个reader
# cat /proc/rwsem_test 
semaphore.count: 0x0000000000000000
# echo "down_read A" > /proc/rwsem_test 
# cat /proc/rwsem_test                           
semaphore.count: 0x0000000000000001
# echo "down_read B" > /proc/rwsem_test
# cat /proc/rwsem_test
semaphore.count: 0x0000000000000002
# echo "down_read C" > /proc/rwsem_test
# cat /proc/rwsem_test 
semaphore.count: 0x0000000000000003
# echo "up_read C" > /proc/rwsem_test
# cat /proc/rwsem_test 
semaphore.count: 0x0000000000000002
# echo "up_read A" > /proc/rwsem_test
# cat /proc/rwsem_test
semaphore.count: 0x0000000000000001
# echo "up_read B" > /proc/rwsem_test
# cat /proc/rwsem_test
semaphore.count: 0x0000000000000000

down_writeup_write

  • 当有writer持有时,且等待wait_list为空,其值为0xffffffff00000001
  • 当有writer持有时,且等待wait_list不为空,其值为0xfffffffe00000001
  • 读写信号量同时只能有一个writer持有
# cat /proc/rwsem_test 
semaphore.count: 0x0000000000000000
# echo "down_write A" > /proc/rwsem_test 
# cat /proc/rwsem_test
semaphore.count: 0xffffffff00000001
# echo "down_write B" > /proc/rwsem_test
# cat /proc/rwsem_test
semaphore.count: 0xfffffffe00000001
        wait list: comm = B, type = RWSEM_WAITING_FOR_WRITE
# echo "down_write C" > /proc/rwsem_test
# cat /proc/rwsem_test
semaphore.count: 0xfffffffe00000001
        wait list: comm = B, type = RWSEM_WAITING_FOR_WRITE
        wait list: comm = C, type = RWSEM_WAITING_FOR_WRITE
# echo "up_write A" > /proc/rwsem_test
# cat /proc/rwsem_test
semaphore.count: 0xfffffffe00000001
        wait list: comm = C, type = RWSEM_WAITING_FOR_WRITE
# echo "up_write B" > /proc/rwsem_test 
# cat /proc/rwsem_test 
semaphore.count: 0xffffffff00000001
# echo "up_write C" > /proc/rwsem_test 
# cat /proc/rwsem_test 
semaphore.count: 0x0000000000000000

其他情况下count的值

通过以下场景,我们理解一下count的值变化规律。其中ABC为读者,DEF为写者。

  • 初始化时,count的值为0x0000000000000000,wait_list为空;
# cat /proc/rwsem_test 
semaphore.count: 0x0000000000000000
  • A读者获取信号量后,count的值为:0x0000000000000001wait_list为空;
# echo "down_read A" > /proc/rwsem_test 
# cat /proc/rwsem_test 
semaphore.count: 0x0000000000000001
  • B读者获取信号量后,count的值为:0x0000000000000002wait_list为空;
# echo "down_read B" > /proc/rwsem_test                                             # cat /proc/rwsem_test 
semaphore.count: 0x0000000000000002
  • D写者尝试获取信号量时,由于读者没有释放,此时count的值为:0xffffffff00000002,wait_listD写者;
# echo "down_write D" > /proc/rwsem_test 
# cat /proc/rwsem_test 
semaphore.count: 0xffffffff00000002
        wait list: comm = D, type = RWSEM_WAITING_FOR_WRITE
  • C读者尝试获取信号量时,由于AB读者没有释放,此时count的值为:0xffffffff00000002,wait_listD写者,C读者;
# echo "down_read C" > /proc/rwsem_test 
# cat /proc/rwsem_test 
semaphore.count: 0xffffffff00000002
        wait list: comm = D, type = RWSEM_WAITING_FOR_WRITE
        wait list: comm = C, type = RWSEM_WAITING_FOR_READ
  • E写者尝试获取信号量时,由于AB读者没有释放,此时count的值为:0xffffffff00000002,wait_listD写者,C读者,E写者;
# echo "down_write E" > /proc/rwsem_test 
# cat /proc/rwsem_test 
semaphore.count: 0xffffffff00000002
        wait list: comm = D, type = RWSEM_WAITING_FOR_WRITE
        wait list: comm = C, type = RWSEM_WAITING_FOR_READ
        wait list: comm = E, type = RWSEM_WAITING_FOR_WRITE
  • A读者释放信号量,由于B读者没有释放,count的值为:0xffffffff00000001wait_listD写者,C读者,E写者;
# echo "up_read A" > /proc/rwsem_test   
# cat /proc/rwsem_test 
semaphore.count: 0xffffffff00000001
        wait list: comm = D, type = RWSEM_WAITING_FOR_WRITE
        wait list: comm = C, type = RWSEM_WAITING_FOR_READ
        wait list: comm = E, type = RWSEM_WAITING_FOR_WRITE
  • B读者释放信号量(count值为0xffffffff00000000),此时D写者获取到信号量,count的值为:0xfffffffe00000001wait_listC读者,E写者;
# echo "up_read B" > /proc/rwsem_test   
# cat /proc/rwsem_test 
semaphore.count: 0xfffffffe00000001
        wait list: comm = C, type = RWSEM_WAITING_FOR_READ
        wait list: comm = E, type = RWSEM_WAITING_FOR_WRITE
  • D 写着释放信号量(count值为0xffffffff00000000),此时C读者获取到信号量,count的值为:0xffffffff00000001, wait_listE写者;
# echo "up_write D" > /proc/rwsem_test
# cat /proc/rwsem_test 
semaphore.count: 0xffffffff00000001
        wait list: comm = E, type = RWSEM_WAITING_FOR_WRITE
  • C读者释放信号量(count值为0xffffffff00000000),此时E写者获取到信号量,count的值为: 0xffffffff00000001,wait_list为空
# echo "up_read C" > /proc/rwsem_test 
# cat /proc/rwsem_test 
semaphore.count: 0xffffffff00000001
  • E释放信号量,此时count的值为: 0x0000000000000000,wait_list为空
# echo "up_write E" > /proc/rwsem_test      
# cat /proc/rwsem_test 
semaphore.count: 0x0000000000000000

通过以上场景,我们可以总结以下规律:

  1. 当读者持有锁时,且没有写者等待时,后续的读者可以直接获取锁成功;
  2. 当有一个写者等待时,后续的写者和读者都不能获取锁;
  3. 当写者持有锁时,后续的写者和读者都不能获取锁;
  4. 等待wait_list上,第一个都是写者;
  5. 当写者释放锁时,等待wait_list上的第一个将会获取锁(不管其时读者还是写着);

归纳一下count值的含义:

count 值 含义
0x0000000000000000 信号量没有被任何读者和写着持有,且没有读者和写者尝试获取信号量
0x000000000000000X X个读者持有信号量,且没有写者尝试获取信号量
0xffffffff0000000X X个读者持有信号量,且等待list不为空且包含写者
0xffffffff00000001 一个读者持有信号量,等待list不为空且包含写者
0xffffffff00000001 一个写者持有信号量,等待list为空
0xfffffffe00000001 写着持有信号量,且等待list不为空
0xffffffff00000000 代表等待list有读者或者写者,但是谁也没有获取到锁,一般是中间状态

x86实现细节分析

/*
 * trylock for reading -- returns 1 if successful, 0 if contention
 */
int down_read_trylock(struct rw_semaphore *sem)
{
        int ret = __down_read_trylock(sem);

        if (ret == 1)
                rwsem_acquire_read(&sem->dep_map, 0, 1, _RET_IP_);
        return ret;
}
/*
 * trylock for reading -- returns 1 if successful, 0 if contention
 */
static inline int __down_read_trylock(struct rw_semaphore *sem)
{
        long result, tmp;
        asm volatile("# beginning __down_read_trylock\n\t"
                     "  mov          %0,%1\n\t"
                     "1:\n\t"
                     "  mov          %1,%2\n\t"
                     "  add          %3,%2\n\t"
                     "  jle          2f\n\t"
                     LOCK_PREFIX "  cmpxchg  %2,%0\n\t"
                     "  jnz          1b\n\t"
                     "2:\n\t"
                     "# ending __down_read_trylock\n\t"
                     : "+m" (sem->count), "=&a" (result), "=&r" (tmp)
                     : "i" (RWSEM_ACTIVE_READ_BIAS)
                     : "memory", "cc");
        return result >= 0 ? 1 : 0;
}
  • 716行中的汇编代码是注释;
  • 7行中的asm volatile的含义如下:
    • asm表示要嵌入汇编代码,后续括号中的为汇编代码
    • volatile表示不需要gcc进行优化汇编代码
  • 第8到15行为汇编代码
  • 第17行为输出操作数,
    • "+m" (sem->count),其中+表示此操作数是可读可写的,m表示A memory operand is allowed, with any kind of address that the machine supports in general.
    • "=&a" (result),其中=表示此操作数是只写的,&此操作数独占其指定的寄存器,a表示寄存器约束,表示该操作数使用寄存器%EAX
    • "=&r" (tmp),其中r表示寄存器约束,此时编译器会在通用寄存器中自动选择一个。
  • 第18行为输入操作数
    • "i" (RWSEM_ACTIVE_READ_BIAS),其中i的解释如下An immediate integer operand (one with constant value) is allowed. This includes symbolic constants whose values will be known only at assembly time or later.这里RWSEM_ACTIVE_READ_BIAS的值为0x00000001L
  • 第20行为Clobber/Modify,即告诉gcc一些情况
    • cc表示汇编代码会改变condition code register
    • memory表示汇编代码会改变内存,从而提示编译器在汇编代码期间,不要值缓存到cache中。
  • 另外,在汇编代码中,%n表示操作数,其按照输出操作数和输入操作数进行的顺序进行引用,从0开始编码。
    • 对应到本段代码中:%0代表sem->count%1代表result%2代表tmp%3代表RWSEM_ACTIVE_READ_BIAS

这段代码编译后的结果如下:

crash>   dis down_read_trylock
0xffffffff8109c2a0 <down_read_trylock>: nopl   0x0(%rax,%rax,1) [FTRACE NOP]
0xffffffff8109c2a5 <down_read_trylock+5>:       push   %rbp
0xffffffff8109c2a6 <down_read_trylock+6>:       mov    %rsp,%rbp
0xffffffff8109c2a9 <down_read_trylock+9>:       mov    (%rdi),%rax
0xffffffff8109c2ac <down_read_trylock+12>:      mov    %rax,%rdx
0xffffffff8109c2af <down_read_trylock+15>:      add    $0x1,%rdx
0xffffffff8109c2b3 <down_read_trylock+19>:      jle    0xffffffff8109c2bc <down_read_trylock+28>
0xffffffff8109c2b5 <down_read_trylock+21>:      lock cmpxchg %rdx,(%rdi)
0xffffffff8109c2ba <down_read_trylock+26>:      jne    0xffffffff8109c2ac <down_read_trylock+12>
0xffffffff8109c2bc <down_read_trylock+28>:      not    %rax
0xffffffff8109c2bf <down_read_trylock+31>:      shr    $0x3f,%rax
0xffffffff8109c2c3 <down_read_trylock+35>:      pop    %rbp
0xffffffff8109c2c4 <down_read_trylock+36>:      retq   

该段代码的目的就是原子的给rw_semaphore.count 值加1,如果成功加1,则返回1,失败则返回0