英语

不可靠的锁定指南

作者:

Rusty Russell

简介

欢迎来到 Rusty 关于内核锁定问题的非常不可靠的指南。本文档描述了 Linux Kernel 2.6 中的锁定系统。

随着 HyperThreading 的广泛应用以及 Linux Kernel 中的抢占,每个在内核上进行黑客攻击的人都需要了解 SMP 的并发和锁定的基本原理。

并发问题

(如果您知道什么是竞争条件,请跳过此部分)。

在一个普通的程序中,您可以像这样增加计数器

very_important_count++;

这是他们期望发生的事情

预期结果

实例 1

实例 2

读取 very_important_count (5)

加 1 (6)

写入 very_important_count (6)

读取 very_important_count (6)

加 1 (7)

写入 very_important_count (7)

这可能发生的事情

可能的结果

实例 1

实例 2

读取 very_important_count (5)

读取 very_important_count (5)

加 1 (6)

加 1 (6)

写入 very_important_count (6)

写入 very_important_count (6)

竞争条件和临界区

这种重叠,即结果取决于多个任务的相对时序,称为竞争条件。 包含并发问题的代码段称为临界区。 尤其是在 Linux 开始在 SMP 机器上运行之后,它们成为内核设计和实现中的主要问题之一。

抢占可能具有相同的效果,即使只有一个 CPU:通过在临界区抢占一个任务,我们也会遇到完全相同的竞争条件。 在这种情况下,抢占的线程可能会自己运行临界区。

解决方案是识别何时发生这些同时访问,并使用锁来确保任何时候只有一个实例可以进入临界区。 Linux 内核中有许多友好的原语可以帮助您做到这一点。 然后是一些不友好的原语,但我会假装它们不存在。

Linux 内核中的锁定

如果我可以给您关于锁定的一条建议:**保持简单**。

不情愿引入新的锁。

内核锁的两种主要类型:自旋锁和互斥锁

有两种主要的内核锁类型。 基本类型是自旋锁 (include/asm/spinlock.h),它是一种非常简单的单持有者锁:如果您无法获得自旋锁,您会一直尝试(旋转)直到可以。 自旋锁非常小且快速,可以在任何地方使用。

第二种类型是互斥锁 (include/linux/mutex.h):它类似于自旋锁,但您可以阻塞持有互斥锁。 如果您无法锁定互斥锁,您的任务将暂停自身,并在互斥锁释放时唤醒。 这意味着 CPU 可以在您等待时执行其他操作。 在许多情况下,您根本无法休眠(请参阅哪些函数可以从中断中安全调用?),因此必须改用自旋锁。

这两种类型的锁都不是递归的:请参阅死锁:简单和高级

锁和单处理器内核

对于没有CONFIG_SMP且没有CONFIG_PREEMPT编译的内核,自旋锁根本不存在。 这是一个极好的设计决策:当没有其他人可以同时运行时,没有理由拥有锁。

如果内核在没有CONFIG_SMP的情况下编译,但设置了CONFIG_PREEMPT,那么自旋锁只是禁用抢占,这足以防止任何竞争。 对于大多数目的,我们可以将抢占视为等同于 SMP,而不必单独担心它。

您应该始终使用启用的CONFIG_SMPCONFIG_PREEMPT测试您的锁定代码,即使您没有 SMP 测试盒,因为它仍然会捕获某些类型的锁定错误。

互斥锁仍然存在,因为它们是用户上下文之间同步所必需的,我们将在下面看到。

仅在用户上下文中锁定

如果您有一个仅从用户上下文访问的数据结构,那么您可以使用一个简单的互斥锁 (include/linux/mutex.h) 来保护它。 这是最简单的情况:您初始化互斥锁。 然后您可以调用mutex_lock_interruptible()来获取互斥锁,并调用mutex_unlock()来释放它。 还有一个mutex_lock(),应该避免使用它,因为如果在收到信号时它不会返回。

示例:net/netfilter/nf_sockopt.c允许注册新的 setsockopt() 和 getsockopt() 调用,使用 nf_register_sockopt()。 注册和注销仅在模块加载和卸载时完成(以及启动时,没有并发),并且仅在未知 setsockopt() 或 getsockopt() 系统调用时才查询注册列表。 nf_sockopt_mutex非常适合保护它,特别是因为 setsockopt 和 getsockopt 调用很可能会休眠。

用户上下文和软中断之间的锁定

如果软中断与用户上下文共享数据,您会遇到两个问题。 首先,当前用户上下文可能会被软中断中断,其次,可能会从另一个 CPU 进入临界区。 这就是 spin_lock_bh() (include/linux/spinlock.h) 的用途。 它禁用该 CPU 上的软中断,然后获取锁。 spin_unlock_bh() 执行相反的操作。 (“_bh”后缀是对“Bottom Halves”的历史引用,即软件中断的旧名称。 在一个完美的世界中,它应该真正被称为 spin_lock_softirq()’)。

请注意,您也可以在此处使用 spin_lock_irq() 或 spin_lock_irqsave(),它们也会停止硬件中断:请参阅硬中断上下文

这对于 UP 也很有效:自旋锁消失了,并且此宏只是变成了 local_bh_disable() (include/linux/interrupt.h),它可以保护您免受运行的软中断。

用户上下文和 Tasklet 之间的锁定

这与上面完全相同,因为 tasklet 实际上是从软中断运行的。

用户上下文和定时器之间的锁定

这与上面也完全相同,因为定时器实际上是从软中断运行的。 从锁定的角度来看,tasklet 和定时器是相同的。

Tasklet/定时器之间的锁定

有时 tasklet 或定时器可能想要与其他 tasklet 或定时器共享数据。

同一个 Tasklet/定时器

由于 tasklet 永远不会在两个 CPU 上同时运行,因此即使在 SMP 上,您也不需要担心您的 tasklet 是可重入的(一次运行两次)。

不同的 Tasklet/定时器

如果另一个 tasklet/定时器想要与您的 tasklet 或定时器共享数据,您都需要使用 spin_lock() 和 spin_unlock() 调用。 在这里,spin_lock_bh() 是不必要的,因为您已经在 tasklet 中,并且不会在同一 CPU 上运行。

软中断之间的锁定

通常,软中断可能想要与其自身或 tasklet/定时器共享数据。

同一个软中断

同一个软中断可以在其他 CPU 上运行:您可以使用每个 CPU 数组(请参阅每个 CPU 数据)以获得更好的性能。 如果您要使用软中断,您可能关心可扩展的性能,足以证明额外的复杂性是合理的。

您需要使用 spin_lock() 和 spin_unlock() 来共享数据。

不同的软中断

您需要使用 spin_lock() 和 spin_unlock() 来共享数据,无论是定时器、tasklet、不同的软中断还是相同或另一个软中断:它们中的任何一个都可能在不同的 CPU 上运行。

硬中断上下文

硬件中断通常与 tasklet 或软中断通信。 通常,这涉及将工作放入队列中,软中断会将其取出。

硬中断和软中断/Tasklet 之间的锁定

如果硬件中断处理程序与软中断共享数据,您有两个问题需要考虑。 首先,软中断处理可能会被硬件中断中断,其次,临界区可能会被另一个 CPU 上的硬件中断进入。 这就是 spin_lock_irq() 的用途。 它被定义为禁用该 cpu 上的中断,然后获取锁。 spin_unlock_irq() 执行相反的操作。

中断处理程序不需要使用 spin_lock_irq(),因为软中断在中断处理程序运行时无法运行:它可以 使用 spin_lock(),这稍微快一些。 唯一的例外是不同的硬件中断处理程序使用相同的锁:spin_lock_irq() 会阻止它中断我们。

这对于 UP 也很有效:自旋锁消失了,并且此宏只是变成了 local_irq_disable() (include/asm/smp.h),它可以保护您免受运行的软中断/tasklet/BH。

spin_lock_irqsave() (include/linux/spinlock.h) 是一种变体,它将中断是打开还是关闭的状态保存在标志字中,该标志字传递给 spin_unlock_irqrestore()。 这意味着可以在硬中断处理程序(中断已关闭)和软中断(需要禁用中断)中使用相同的代码。

请注意,软中断(以及 tasklet 和定时器)是在从硬件中断返回时运行的,因此 spin_lock_irq() 也会停止这些中断。 从这个意义上说,spin_lock_irqsave() 是最通用和最强大的锁定功能。

两个硬中断处理程序之间的锁定

在两个 IRQ 处理程序之间共享数据的情况很少见,但如果您这样做,则应使用 spin_lock_irqsave():是否在 irq 处理程序本身中禁用所有中断取决于具体的架构。

锁定速查表

Pete Zaitcev 给出了以下总结

  • 如果您处于进程上下文中(任何系统调用)并且想要阻止其他进程,请使用互斥锁。 您可以获取互斥锁并休眠 (copy_from_user()kmalloc(x,GFP_KERNEL))。

  • 否则(== 数据可以在中断中被触摸),使用 spin_lock_irqsave() 和 spin_unlock_irqrestore()。

  • 避免持有自旋锁超过 5 行代码,并且不要跨任何函数调用(除了像 readb() 这样的访问器)。

最低要求表

下表列出了各种上下文之间**最低**锁定要求。 在某些情况下,同一上下文一次只能在一个 CPU 上运行,因此该上下文不需要锁定(例如,特定的线程一次只能在一个 CPU 上运行,但如果它需要与其他线程共享数据,则需要锁定)。

请记住上面的建议:您始终可以使用 spin_lock_irqsave(),它是所有其他自旋锁原语的超集。

.

IRQ 处理程序 A

IRQ 处理程序 B

软中断 A

软中断 B

Tasklet A

Tasklet B

定时器 A

定时器 B

用户上下文 A

用户上下文 B

IRQ 处理程序 A

IRQ 处理程序 B

SLIS

软中断 A

SLI

SLI

SL

软中断 B

SLI

SLI

SL

SL

Tasklet A

SLI

SLI

SL

SL

Tasklet B

SLI

SLI

SL

SL

SL

定时器 A

SLI

SLI

SL

SL

SL

SL

定时器 B

SLI

SLI

SL

SL

SL

SL

SL

用户上下文 A

SLI

SLI

SLBH

SLBH

SLBH

SLBH

SLBH

SLBH

用户上下文 B

SLI

SLI

SLBH

SLBH

SLBH

SLBH

SLBH

SLBH

MLI

表:锁定要求表

SLIS

spin_lock_irqsave

SLI

spin_lock_irq

SL

spin_lock

SLBH

spin_lock_bh

MLI

mutex_lock_interruptible

表:锁定要求表的图例

trylock 函数

有些函数尝试只获取一次锁,并立即返回一个值,指示获取锁的成功或失败。 如果当其他线程持有锁时,您不需要访问受锁保护的数据,则可以使用它们。 如果您随后需要访问受锁保护的数据,您应该稍后获取锁。

spin_trylock() 不会旋转,但如果在第一次尝试时获取了自旋锁,则返回非零值,否则返回 0。 此函数可以在所有上下文中像 spin_lock() 一样使用:您必须禁用可能会中断您并获取自旋锁的上下文。

mutex_trylock()不会暂停您的任务,但如果它可以在第一次尝试时锁定互斥锁,则返回非零值,否则返回 0。 尽管不休眠,但此函数不能在硬件或软件中断上下文中安全使用。

常见例子

让我们逐步完成一个简单的示例:数字到名称映射的缓存。 缓存会跟踪每个对象的使用频率,并在填满时,丢弃使用最少的对象。

全部在用户上下文中

对于我们的第一个示例,我们假设所有操作都在用户上下文中(即来自系统调用),因此我们可以休眠。 这意味着我们可以使用互斥锁来保护缓存和其中的所有对象。 这是代码

#include <linux/list.h>
#include <linux/slab.h>
#include <linux/string.h>
#include <linux/mutex.h>
#include <asm/errno.h>

struct object
{
        struct list_head list;
        int id;
        char name[32];
        int popularity;
};

/* Protects the cache, cache_num, and the objects within it */
static DEFINE_MUTEX(cache_lock);
static LIST_HEAD(cache);
static unsigned int cache_num = 0;
#define MAX_CACHE_SIZE 10

/* Must be holding cache_lock */
static struct object *__cache_find(int id)
{
        struct object *i;

        list_for_each_entry(i, &cache, list)
                if (i->id == id) {
                        i->popularity++;
                        return i;
                }
        return NULL;
}

/* Must be holding cache_lock */
static void __cache_delete(struct object *obj)
{
        BUG_ON(!obj);
        list_del(&obj->list);
        kfree(obj);
        cache_num--;
}

/* Must be holding cache_lock */
static void __cache_add(struct object *obj)
{
        list_add(&obj->list, &cache);
        if (++cache_num > MAX_CACHE_SIZE) {
                struct object *i, *outcast = NULL;
                list_for_each_entry(i, &cache, list) {
                        if (!outcast || i->popularity < outcast->popularity)
                                outcast = i;
                }
                __cache_delete(outcast);
        }
}

int cache_add(int id, const char *name)
{
        struct object *obj;

        if ((obj = kmalloc(sizeof(*obj), GFP_KERNEL)) == NULL)
                return -ENOMEM;

        strscpy(obj->name, name, sizeof(obj->name));
        obj->id = id;
        obj->popularity = 0;

        mutex_lock(&cache_lock);
        __cache_add(obj);
        mutex_unlock(&cache_lock);
        return 0;
}

void cache_delete(int id)
{
        mutex_lock(&cache_lock);
        __cache_delete(__cache_find(id));
        mutex_unlock(&cache_lock);
}

int cache_find(int id, char *name)
{
        struct object *obj;
        int ret = -ENOENT;

        mutex_lock(&cache_lock);
        obj = __cache_find(id);
        if (obj) {
                ret = 0;
                strcpy(name, obj->name);
        }
        mutex_unlock(&cache_lock);
        return ret;
}

请注意,当我们添加、删除或查找缓存时,我们始终确保拥有 cache_lock:缓存基础设施本身和对象的内容都受到锁的保护。 在这种情况下,这很容易,因为我们复制用户的数据,并且从不让他们直接访问对象。

这里有一个轻微(且常见)的优化:在 cache_add() 中,我们在获取锁之前设置对象的字段。 这是安全的,因为在我们将其放入缓存之前,没有人可以访问它。

从中断上下文中访问

现在考虑可以从中断上下文调用 cache_find() 的情况:硬件中断或软中断。 一个例子是删除缓存中对象的定时器。

更改如下所示,采用标准补丁格式:-是删除的行,+是添加的行。

--- cache.c.usercontext 2003-12-09 13:58:54.000000000 +1100
+++ cache.c.interrupt   2003-12-09 14:07:49.000000000 +1100
@@ -12,7 +12,7 @@
         int popularity;
 };

-static DEFINE_MUTEX(cache_lock);
+static DEFINE_SPINLOCK(cache_lock);
 static LIST_HEAD(cache);
 static unsigned int cache_num = 0;
 #define MAX_CACHE_SIZE 10
@@ -55,6 +55,7 @@
 int cache_add(int id, const char *name)
 {
         struct object *obj;
+        unsigned long flags;

         if ((obj = kmalloc(sizeof(*obj), GFP_KERNEL)) == NULL)
                 return -ENOMEM;
@@ -63,30 +64,33 @@
         obj->id = id;
         obj->popularity = 0;

-        mutex_lock(&cache_lock);
+        spin_lock_irqsave(&cache_lock, flags);
         __cache_add(obj);
-        mutex_unlock(&cache_lock);
+        spin_unlock_irqrestore(&cache_lock, flags);
         return 0;
 }

 void cache_delete(int id)
 {
-        mutex_lock(&cache_lock);
+        unsigned long flags;
+
+        spin_lock_irqsave(&cache_lock, flags);
         __cache_delete(__cache_find(id));
-        mutex_unlock(&cache_lock);
+        spin_unlock_irqrestore(&cache_lock, flags);
 }

 int cache_find(int id, char *name)
 {
         struct object *obj;
         int ret = -ENOENT;
+        unsigned long flags;

-        mutex_lock(&cache_lock);
+        spin_lock_irqsave(&cache_lock, flags);
         obj = __cache_find(id);
         if (obj) {
                 ret = 0;
                 strcpy(name, obj->name);
         }
-        mutex_unlock(&cache_lock);
+        spin_unlock_irqrestore(&cache_lock, flags);
         return ret;
 }

请注意,如果中断已打开,spin_lock_irqsave() 将关闭中断,否则不执行任何操作(如果我们已经在中断处理程序中),因此这些函数可以从任何上下文中安全调用。

不幸的是,cache_add() 使用 GFP_KERNEL 标志调用kmalloc(),这仅在用户上下文中合法。 我假设 cache_add() 仍然仅在用户上下文中被调用,否则这应该成为 cache_add() 的一个参数。

在此文件外部公开对象

如果我们的对象包含更多信息,则复制信息的进出可能不足:代码的其他部分可能希望保留指向这些对象的指针,例如,而不是每次都查找 id。 这会产生两个问题。

第一个问题是我们使用cache_lock来保护对象:我们需要使其成为非静态的,以便代码的其余部分可以使用它。 这使得锁定更加棘手,因为它不再都在一个地方。

第二个问题是生存期问题:如果另一个结构保留指向对象的指针,则它大概希望该指针保持有效。 不幸的是,这仅在您持有锁时才能保证,否则有人可能会调用 cache_delete(),甚至更糟糕的是,添加另一个对象,重新使用相同的地址。

由于只有一个锁,您不能永远持有它:没有人可以完成任何工作。

解决此问题的方法是使用引用计数:每个拥有指向对象的指针的人都会在第一次获取对象时增加它,并在完成对象后删除引用计数。 无论谁将其删除为零都知道它未被使用,并且实际上可以删除它。

这是代码

--- cache.c.interrupt   2003-12-09 14:25:43.000000000 +1100
+++ cache.c.refcnt  2003-12-09 14:33:05.000000000 +1100
@@ -7,6 +7,7 @@
 struct object
 {
         struct list_head list;
+        unsigned int refcnt;
         int id;
         char name[32];
         int popularity;
@@ -17,6 +18,35 @@
 static unsigned int cache_num = 0;
 #define MAX_CACHE_SIZE 10

+static void __object_put(struct object *obj)
+{
+        if (--obj->refcnt == 0)
+                kfree(obj);
+}
+
+static void __object_get(struct object *obj)
+{
+        obj->refcnt++;
+}
+
+void object_put(struct object *obj)
+{
+        unsigned long flags;
+
+        spin_lock_irqsave(&cache_lock, flags);
+        __object_put(obj);
+        spin_unlock_irqrestore(&cache_lock, flags);
+}
+
+void object_get(struct object *obj)
+{
+        unsigned long flags;
+
+        spin_lock_irqsave(&cache_lock, flags);
+        __object_get(obj);
+        spin_unlock_irqrestore(&cache_lock, flags);
+}
+
 /* Must be holding cache_lock */
 static struct object *__cache_find(int id)
 {
@@ -35,6 +65,7 @@
 {
         BUG_ON(!obj);
         list_del(&obj->list);
+        __object_put(obj);
         cache_num--;
 }

@@ -63,6 +94,7 @@
         strscpy(obj->name, name, sizeof(obj->name));
         obj->id = id;
         obj->popularity = 0;
+        obj->refcnt = 1; /* The cache holds a reference */

         spin_lock_irqsave(&cache_lock, flags);
         __cache_add(obj);
@@ -79,18 +111,15 @@
         spin_unlock_irqrestore(&cache_lock, flags);
 }

-int cache_find(int id, char *name)
+struct object *cache_find(int id)
 {
         struct object *obj;
-        int ret = -ENOENT;
         unsigned long flags;

         spin_lock_irqsave(&cache_lock, flags);
         obj = __cache_find(id);
-        if (obj) {
-                ret = 0;
-                strcpy(name, obj->name);
-        }
+        if (obj)
+                __object_get(obj);
         spin_unlock_irqrestore(&cache_lock, flags);
-        return ret;
+        return obj;
 }

我们将引用计数封装在标准“get”和“put”函数中。 现在我们可以从 cache_find() 返回对象本身,这具有用户现在可以休眠持有该对象的优势(例如,将 copy_to_user() 命名到用户空间)。

另一点要注意的是,我说应该为每个指向对象的指针持有引用:因此,首次插入缓存时,引用计数为 1。 在某些版本中,框架不持有引用计数,但它们更复杂。

使用原子操作进行引用计数

实际上,atomic_t通常用于 refcnt。 在include/asm/atomic.h中定义了许多原子操作:保证从系统中所有 CPU 原子地看到这些操作,因此不需要锁。 在这种情况下,它比使用自旋锁更简单,尽管对于任何非平凡的事情,使用自旋锁更清晰。 atomic_inc()atomic_dec_and_test()用于代替标准递增和递减运算符,并且该锁不再用于保护引用计数本身。

--- cache.c.refcnt  2003-12-09 15:00:35.000000000 +1100
+++ cache.c.refcnt-atomic   2003-12-11 15:49:42.000000000 +1100
@@ -7,7 +7,7 @@
 struct object
 {
         struct list_head list;
-        unsigned int refcnt;
+        atomic_t refcnt;
         int id;
         char name[32];
         int popularity;
@@ -18,33 +18,15 @@
 static unsigned int cache_num = 0;
 #define MAX_CACHE_SIZE 10

-static void __object_put(struct object *obj)
-{
-        if (--obj->refcnt == 0)
-                kfree(obj);
-}
-
-static void __object_get(struct object *obj)
-{
-        obj->refcnt++;
-}
-
 void object_put(struct object *obj)
 {
-        unsigned long flags;
-
-        spin_lock_irqsave(&cache_lock, flags);
-        __object_put(obj);
-        spin_unlock_irqrestore(&cache_lock, flags);
+        if (atomic_dec_and_test(&obj->refcnt))
+                kfree(obj);
 }

 void object_get(struct object *obj)
 {
-        unsigned long flags;
-
-        spin_lock_irqsave(&cache_lock, flags);
-        __object_get(obj);
-        spin_unlock_irqrestore(&cache_lock, flags);
+        atomic_inc(&obj->refcnt);
 }

 /* Must be holding cache_lock */
@@ -65,7 +47,7 @@
 {
         BUG_ON(!obj);
         list_del(&obj->list);
-        __object_put(obj);
+        object_put(obj);
         cache_num--;
 }

@@ -94,7 +76,7 @@
         strscpy(obj->name, name, sizeof(obj->name));
         obj->id = id;
         obj->popularity = 0;
-        obj->refcnt = 1; /* The cache holds a reference */
+        atomic_set(&obj->refcnt, 1); /* The cache holds a reference */

         spin_lock_irqsave(&cache_lock, flags);
         __cache_add(obj);
@@ -119,7 +101,7 @@
         spin_lock_irqsave(&cache_lock, flags);
         obj = __cache_find(id);
         if (obj)
-                __object_get(obj);
+                object_get(obj);
         spin_unlock_irqrestore(&cache_lock, flags);
         return obj;
 }

保护对象本身

在这些示例中,我们假设一旦创建对象,它们(引用计数除外)永远不会更改。 如果我们想允许更改名称,则有三种可能性

  • 您可以使cache_lock成为非静态的,并告诉人们在更改任何对象中的名称之前获取该锁。

  • 您可以提供一个 cache_obj_rename(),它获取此锁并为调用者更改名称,并告诉每个人使用该函数。

  • 您可以使cache_lock仅保护缓存本身,并使用另一个锁来保护名称。

从理论上讲,您可以为每个对象的每个字段制作锁,使锁的粒度尽可能细。 实际上,最常见的变体是

  • 一个锁,用于保护基础设施(本示例中的cache列表)和所有对象。 这就是我们迄今为止所做的。

  • 一个锁,用于保护基础设施(包括对象内的列表指针),以及对象内的一个锁,用于保护该对象的其余部分。

  • 多个锁,用于保护基础设施(例如,每个哈希链一个锁),可能具有单独的每个对象锁。

这是“每个对象锁”的实现

--- cache.c.refcnt-atomic   2003-12-11 15:50:54.000000000 +1100
+++ cache.c.perobjectlock   2003-12-11 17:15:03.000000000 +1100
@@ -6,11 +6,17 @@

 struct object
 {
+        /* These two protected by cache_lock. */
         struct list_head list;
+        int popularity;
+
         atomic_t refcnt;
+
+        /* Doesn't change once created. */
         int id;
+
+        spinlock_t lock; /* Protects the name */
         char name[32];
-        int popularity;
 };

 static DEFINE_SPINLOCK(cache_lock);
@@ -77,6 +84,7 @@
         obj->id = id;
         obj->popularity = 0;
         atomic_set(&obj->refcnt, 1); /* The cache holds a reference */
+        spin_lock_init(&obj->lock);

         spin_lock_irqsave(&cache_lock, flags);
         __cache_add(obj);

请注意,我决定受欢迎程度计数应由cache_lock而不是每个对象锁来保护:这是因为它(像对象内的struct list_head一样)在逻辑上是基础设施的一部分。 这样,在寻找最不受欢迎的对象时,我不需要在 __cache_add() 中获取每个对象的锁。

我还决定 id 成员是不可更改的,因此我不需要在 __cache_find() 中获取每个对象锁来检查 id:对象锁仅由想要读取或写入 name 字段的调用者使用。

另请注意,我添加了一条注释,描述了哪些数据受哪些锁保护。 这非常重要,因为它描述了代码的运行时行为,并且很难仅通过阅读获得。 正如 Alan Cox 所说,“锁定数据,而不是代码”。

常见问题

死锁:简单和高级

有一个编码错误,其中一段代码尝试获取自旋锁两次:它将永远旋转,等待锁被释放(自旋锁、读写锁和互斥锁在 Linux 中不是递归的)。 这很容易诊断:不是一个熬夜五晚与毛茸茸的代码兔子交谈的那种问题。

对于稍微复杂的情况,想象一下您有一个由软中断和用户上下文共享的区域。 如果您使用 spin_lock() 调用来保护它,则用户上下文可能会在持有锁时被软中断中断,然后软中断将永远旋转,尝试获取相同的锁。

这两种情况都称为死锁,如上所示,即使使用单个 CPU 也会发生(尽管在 UP 编译中不会发生,因为自旋锁在CONFIG_SMP=n 的内核编译中消失了。 在第二个示例中,您仍然会得到数据损坏)。

这种完全锁定很容易诊断:在 SMP 盒子上,看门狗定时器或使用设置的DEBUG_SPINLOCK编译(include/linux/spinlock.h)会在发生时立即显示这一点。

一个更复杂的问题是所谓的“致命拥抱”,涉及两个或多个锁。 假设您有一个哈希表:表中的每个条目都是一个自旋锁,以及一个哈希对象的链。 在软中断处理程序中,有时您想将对象从哈希中的一个位置更改为另一个位置:您获取旧哈希链的自旋锁和新哈希链的自旋锁,然后从旧哈希链中删除对象,并将其插入到新哈希链中。

这里有两个问题。首先,如果你的代码尝试将对象移动到同一条链上,它会因为尝试两次锁定它而导致死锁。其次,如果另一个 CPU 上的同一 softirq 试图以相反的方向移动另一个对象,则可能发生以下情况:

CPU 1

CPU 2

获取锁 A -> 成功

获取锁 B -> 成功

获取锁 B -> 旋转等待

获取锁 A -> 旋转等待

表格:后果

这两个 CPU 将永远旋转等待,等待对方释放它们的锁。它看起来、闻起来和感觉起来就像崩溃一样。

防止死锁

教科书会告诉你,如果你总是以相同的顺序锁定,你永远不会遇到这种死锁。实践会告诉你,这种方法无法扩展:当我创建一个新锁时,我对内核的理解还不足以弄清楚它应该放在 5000 个锁层次结构中的哪个位置。

最好的锁是被封装的:它们永远不会在头文件中暴露,并且永远不会在同一文件之外调用非平凡函数时持有它们。你可以阅读这段代码,发现它永远不会死锁,因为它永远不会在持有锁的时候尝试获取另一个锁。使用你代码的人甚至不需要知道你正在使用锁。

这里一个经典的问题是当你提供回调或钩子时:如果在持有锁的情况下调用这些回调或钩子,你可能会面临简单的死锁或致命的拥抱(谁知道回调会做什么?)。

过度防止死锁

死锁是有问题的,但不如数据损坏那么糟糕。获取读锁、搜索列表、找不到想要的内容、释放读锁、获取写锁并插入对象的代码存在竞争条件。

竞速定时器:内核的消遣

定时器可能会产生其自身特殊的竞争问题。考虑一个对象集合(列表、哈希等),其中每个对象都有一个定时器,该定时器应该销毁它。

如果你想销毁整个集合(比如在模块移除时),你可能会这样做:

/* THIS CODE BAD BAD BAD BAD: IF IT WAS ANY WORSE IT WOULD USE
   HUNGARIAN NOTATION */
spin_lock_bh(&list_lock);

while (list) {
        struct foo *next = list->next;
        timer_delete(&list->timer);
        kfree(list);
        list = next;
}

spin_unlock_bh(&list_lock);

迟早,这会在 SMP 上崩溃,因为定时器可能已经在 spin_lock_bh() 之前触发了,并且它只有在 spin_unlock_bh() 之后才能获得锁,然后尝试释放元素(该元素已经被释放了!)。

可以通过检查 timer_delete() 的结果来避免这种情况:如果它返回 1,则表示定时器已被删除。 如果返回 0,则表示(在这种情况下)它当前正在运行,所以我们可以这样做:

retry:
        spin_lock_bh(&list_lock);

        while (list) {
                struct foo *next = list->next;
                if (!timer_delete(&list->timer)) {
                        /* Give timer a chance to delete this */
                        spin_unlock_bh(&list_lock);
                        goto retry;
                }
                kfree(list);
                list = next;
        }

        spin_unlock_bh(&list_lock);

另一个常见的问题是删除会自行重启的定时器(通过在其定时器函数的末尾调用 add_timer())。因为这是一个相当常见的容易发生竞争的情况,所以你应该使用 timer_delete_sync() (include/linux/timer.h) 来处理这种情况。

在释放定时器之前,应该调用 timer_shutdown()timer_shutdown_sync(),以防止它被重新激活。 任何后续尝试重新激活定时器的操作都将被核心代码静默忽略。

锁的速度

在考虑执行锁定的代码的速度时,主要有三个方面需要注意。首先是并发性:当其他人持有锁时,有多少东西会处于等待状态。其次是实际获取和释放未竞争锁所花费的时间。第三是使用更少或更智能的锁。我假设锁的使用频率相当高:否则,你不会关心效率。

并发性取决于锁通常被持有的时间:你应该在需要的时间内持有锁,但不要更长。在缓存示例中,我们始终在未持有锁的情况下创建对象,然后仅在我们准备将其插入列表时才获取锁。

获取时间取决于锁操作对流水线造成的损害程度(流水线停顿)以及此 CPU 是上一个获取锁的 CPU 的可能性(即,对于此 CPU 来说,锁是否是缓存热的):在具有更多 CPU 的机器上,这种可能性迅速下降。考虑一个 700MHz 的 Intel Pentium III:一条指令大约需要 0.7ns,一个原子递增大约需要 58ns,一个在此 CPU 上缓存热的锁需要 160ns,而从另一个 CPU 传输缓存行则需要额外 170 到 360ns。(这些数字来自 Paul McKenney 的 Linux Journal RCU 文章)。

这两个目标是冲突的:通过将锁分成几部分(例如在我们最终的按对象锁示例中),可以缩短持有锁的时间,但这会增加锁的获取次数,并且结果通常比拥有单个锁更慢。这是提倡锁定简单性的另一个原因。

第三个问题将在下面讨论:有一些方法可以减少需要完成的锁定量。

读/写锁变体

自旋锁和互斥锁都有读/写变体:rwlock_tstruct rw_semaphore。 这些将用户分为两类:读者和写者。如果你只是读取数据,你可以获得一个读锁,但是要写入数据,你需要写锁。 许多人可以持有读锁,但是写者必须是唯一的持有者。

如果你的代码沿着读者/写者的界限整齐地划分(就像我们的缓存代码一样),并且锁被读者持有很长时间,那么使用这些锁可能会有所帮助。 但是,它们比普通锁稍微慢一些,因此在实践中,rwlock_t 通常不值得使用。

避免锁:读复制更新

有一种称为读复制更新的特殊的读/写锁定方法。 使用 RCU,读者可以完全避免获取锁:因为我们希望我们的缓存被读取的频率高于更新的频率(否则缓存就是浪费时间),所以它是这种优化的候选者。

我们如何摆脱读锁? 摆脱读锁意味着写者可能会在读者不知情的情况下更改列表。 这实际上非常简单:如果写者非常小心地添加元素,我们可以在添加元素时读取链表。 例如,将 new 添加到名为 list 的单链表中

new->next = list->next;
wmb();
list->next = new;

wmb() 是一个写内存屏障。 它确保第一个操作(设置新元素的 next 指针)已完成并且将被所有 CPU 看到,然后才能执行第二个操作(将新元素放入列表中)。 这很重要,因为现代编译器和现代 CPU 都可以重新排序指令,除非另有说明:我们希望读者要么根本看不到新元素,要么看到 next 指针正确指向列表其余部分的新元素。

幸运的是,有一个函数可以为标准 struct list_head 列表执行此操作:list_add_rcu() (include/linux/list.h)。

从列表中删除元素甚至更简单:我们将指向旧元素的指针替换为指向其后继元素的指针,读者要么看到它,要么跳过它。

list->next = old->next;

有一个 list_del_rcu() (include/linux/list.h) 可以执行此操作(正常版本会毒化旧对象,这不是我们想要的)。

读者也必须小心:某些 CPU 可以通过 next 指针提前开始读取下一个元素的内容,但是当 next 指针在其不知情的情况下更改时,他们不会意识到预取的内容是错误的。 再次,有一个 list_for_each_entry_rcu() (include/linux/list.h) 来帮助你。 当然,写者可以使用 list_for_each_entry(),因为不可能有两个同时写者。

我们最终的困境是:我们什么时候才能真正销毁被删除的元素? 请记住,读者可能现在正在列表中逐步执行此元素:如果我们释放此元素并且 next 指针更改,则读者将跳入垃圾数据并崩溃。 我们需要等到我们知道在删除元素时遍历列表的所有读者都已完成。 我们使用 call_rcu() 注册一个回调,该回调将在所有先前存在的读者完成后实际销毁该对象。 另外,可以使用 synchronize_rcu() 阻塞,直到所有先前存在的读者都完成。

但是读复制更新如何知道读者何时完成? 方法是这样的:首先,读者始终在 rcu_read_lock()/rcu_read_unlock() 对中遍历列表:这些只是禁用抢占,因此读者在读取列表时不会进入休眠状态。

然后 RCU 等待直到每个其他 CPU 至少休眠一次:由于读者无法休眠,我们知道在删除期间遍历列表的任何读者都已完成,并且回调已触发。 真正的读复制更新代码比这稍微优化,但这是基本思想。

--- cache.c.perobjectlock   2003-12-11 17:15:03.000000000 +1100
+++ cache.c.rcupdate    2003-12-11 17:55:14.000000000 +1100
@@ -1,15 +1,18 @@
 #include <linux/list.h>
 #include <linux/slab.h>
 #include <linux/string.h>
+#include <linux/rcupdate.h>
 #include <linux/mutex.h>
 #include <asm/errno.h>

 struct object
 {
-        /* These two protected by cache_lock. */
+        /* This is protected by RCU */
         struct list_head list;
         int popularity;

+        struct rcu_head rcu;
+
         atomic_t refcnt;

         /* Doesn't change once created. */
@@ -40,7 +43,7 @@
 {
         struct object *i;

-        list_for_each_entry(i, &cache, list) {
+        list_for_each_entry_rcu(i, &cache, list) {
                 if (i->id == id) {
                         i->popularity++;
                         return i;
@@ -49,19 +52,25 @@
         return NULL;
 }

+/* Final discard done once we know no readers are looking. */
+static void cache_delete_rcu(void *arg)
+{
+        object_put(arg);
+}
+
 /* Must be holding cache_lock */
 static void __cache_delete(struct object *obj)
 {
         BUG_ON(!obj);
-        list_del(&obj->list);
-        object_put(obj);
+        list_del_rcu(&obj->list);
         cache_num--;
+        call_rcu(&obj->rcu, cache_delete_rcu);
 }

 /* Must be holding cache_lock */
 static void __cache_add(struct object *obj)
 {
-        list_add(&obj->list, &cache);
+        list_add_rcu(&obj->list, &cache);
         if (++cache_num > MAX_CACHE_SIZE) {
                 struct object *i, *outcast = NULL;
                 list_for_each_entry(i, &cache, list) {
@@ -104,12 +114,11 @@
 struct object *cache_find(int id)
 {
         struct object *obj;
-        unsigned long flags;

-        spin_lock_irqsave(&cache_lock, flags);
+        rcu_read_lock();
         obj = __cache_find(id);
         if (obj)
                 object_get(obj);
-        spin_unlock_irqrestore(&cache_lock, flags);
+        rcu_read_unlock();
         return obj;
 }

请注意,读者将更改 __cache_find() 中的 popularity 成员,现在它不持有锁。 一种解决方案是将其设为 atomic_t,但是对于此用法,我们实际上并不关心竞争:近似结果就足够了,所以我没有更改它。

结果是 cache_find() 不需要与任何其他函数同步,因此在 SMP 上几乎与在 UP 上一样快。

这里还有一种可能的进一步优化:记住我们最初的缓存代码,其中没有引用计数,并且调用者在使用对象时只需持有锁? 这仍然是可能的:如果你持有锁,则没有人可以删除该对象,因此你不需要获取和释放引用计数。

现在,因为 RCU 中的“读锁”只是禁用抢占,所以在调用 cache_find() 和 object_put() 之间始终禁用抢占的调用者实际上不需要获取和释放引用计数:我们可以通过使 __cache_find() 非静态来暴露它,并且此类调用者可以简单地调用该函数。

这里的好处是引用计数未被写入:对象没有以任何方式更改,由于缓存,这在 SMP 机器上要快得多。

每个 CPU 的数据

另一种用于避免锁定的技术是为每个 CPU 复制信息。 例如,如果你想保持对常见条件的计数,则可以使用自旋锁和单个计数器。 简单又好。

如果这太慢了(通常不是,但是如果你有一台非常大的机器可以进行测试并且可以证明确实如此),则可以改为为每个 CPU 使用一个计数器,这样它们都不需要独占锁。 请参阅 DEFINE_PER_CPU()、get_cpu_var() 和 put_cpu_var() (include/linux/percpu.h)。

对于简单的每个 CPU 计数器,特别有用的是 local_t 类型以及 cpu_local_inc() 和相关函数,它们在某些体系结构上比简单代码更有效 (include/asm/local.h)。

请注意,在不引入更多锁的情况下,没有简单、可靠的方法来获取此类计数器的确切值。 对于某些用途,这不是问题。

主要由 IRQ 处理程序使用的数据

如果始终从同一个 IRQ 处理程序中访问数据,则根本不需要锁:内核已经保证 irq 处理程序不会在多个 CPU 上同时运行。

Manfred Spraul 指出,即使很少在用户上下文中或 softirqs/tasklets 中访问数据,你仍然可以这样做。 irq 处理程序不使用锁,所有其他访问都按以下方式完成:

mutex_lock(&lock);
disable_irq(irq);
...
enable_irq(irq);
mutex_unlock(&lock);

disable_irq() 阻止 irq 处理程序运行(如果它当前在其他 CPU 上运行,则等待它完成)。 自旋锁阻止任何其他访问同时发生。 当然,这比只调用 spin_lock_irq() 慢,因此只有在这种类型的访问极少发生时才有意义。

哪些函数可以安全地从中断中调用?

内核中的许多函数会休眠(即直接或间接调用 schedule()):你永远不能在持有自旋锁或禁用抢占的情况下调用它们。 这也意味着你需要在用户上下文中:从中断中调用它们是非法的。

一些会休眠的函数

下面列出了最常见的函数,但你通常需要阅读代码才能找出其他调用是否安全。 如果其他所有调用者都可以休眠,那么你可能也需要能够休眠。 特别是,注册和注销函数通常期望从用户上下文中调用,并且可以休眠。

  • 访问用户空间

  • kmalloc(GP_KERNEL) <kmalloc>`

  • mutex_lock_interruptible()mutex_lock()

    有一个 mutex_trylock() 不会休眠。 尽管如此,它不能在中断上下文中使用,因为它的实现对于这种情况是不安全的。 mutex_unlock() 也永远不会休眠。 它也不能在中断上下文中使用,因为互斥锁必须由获取它的同一任务释放。

一些不会休眠的函数

某些函数可以安全地从任何上下文调用,或者持有几乎任何锁。

互斥锁 API 参考

mutex_init

mutex_init (mutex)

初始化互斥锁

参数

mutex

要初始化的互斥锁

描述

将互斥锁初始化为未锁定状态。

不允许初始化已锁定的互斥锁。

mutex_init_with_key

mutex_init_with_key (mutex, key)

使用给定的 lockdep 键初始化互斥锁

参数

mutex

要初始化的互斥锁

key

要与互斥锁关联的 lockdep 键

描述

将互斥锁初始化为未锁定状态。

不允许初始化已锁定的互斥锁。

bool mutex_is_locked(struct mutex *lock)

互斥锁是否已锁定

参数

struct mutex *lock

要查询的互斥锁

描述

如果互斥锁已锁定,则返回 true,如果未锁定,则返回 false。

void mutex_lock(struct mutex *lock)

获取互斥锁

参数

struct mutex *lock

要获取的互斥锁

描述

为此任务独占锁定互斥锁。 如果互斥锁当前不可用,它将休眠直到可以获取它。

互斥锁稍后必须由获取它的同一任务释放。 不允许递归锁定。 任务在未先解锁互斥锁的情况下不得退出。 此外,互斥锁所在的内核内存不得在互斥锁仍锁定的情况下释放。 互斥锁必须先初始化(或静态定义),然后才能锁定。 不允许 memset() 将互斥锁设置为 0。

(CONFIG_DEBUG_MUTEXES .config 选项会启用调试检查,该检查将强制执行限制,并且还将进行死锁调试)

此函数类似于(但不等效于)down()。

void mutex_unlock(struct mutex *lock)

释放互斥锁

参数

struct mutex *lock

要释放的互斥锁

描述

解锁先前由此任务锁定的互斥锁。

此函数不得在中断上下文中使用。 不允许解锁未锁定的互斥锁。

调用者必须确保互斥锁保持活动状态,直到此函数返回 - mutex_unlock() 不能直接用于释放对象,以便另一个并发任务可以释放它。 在这方面,互斥锁与自旋锁和引用计数不同。

此函数类似于(但不等效于)up()。

void ww_mutex_unlock(struct ww_mutex *lock)

释放 w/w 互斥锁

参数

struct ww_mutex *lock

要释放的互斥锁

描述

解锁先前由此任务使用任何 ww_mutex_lock* 函数(无论是否带有获取上下文)锁定的互斥锁。 禁止在释放获取上下文后释放锁。

此函数不得在中断上下文中使用。 不允许解锁未锁定的互斥锁。

int ww_mutex_trylock(struct ww_mutex *ww, struct ww_acquire_ctx *ww_ctx)

尝试获取带有可选获取上下文的 w/w 互斥锁

参数

struct ww_mutex *ww

要锁定的互斥锁

struct ww_acquire_ctx *ww_ctx

可选的 w/w 获取上下文

描述

尝试使用可选的获取上下文锁定互斥锁; 不可能进行死锁检测。 如果互斥锁已成功获取,则返回 1,否则返回 0。

与 ww_mutex_lock 不同,不执行死锁处理。 但是,如果指定了 ctx,则可能会在对 ww_mutex_trylock 的调用中发生 -EALREADY 处理。

使用此函数获取的互斥锁必须使用 ww_mutex_unlock 释放。

int mutex_lock_interruptible(struct mutex *lock)

获取互斥锁,可被信号中断。

参数

struct mutex *lock

要获取的互斥锁。

描述

mutex_lock() 一样锁定互斥锁。 如果在进程休眠时传递信号,此函数将返回而不获取互斥锁。

上下文

进程上下文。

返回

如果成功获取锁,则返回 0,如果收到信号,则返回 -EINTR

int mutex_lock_killable(struct mutex *lock)

获取互斥锁,可被致命信号中断。

参数

struct mutex *lock

要获取的互斥锁。

描述

mutex_lock() 一样锁定互斥锁。 如果在进程休眠时传递会使当前进程致命的信号,此函数将返回而不获取互斥锁。

上下文

进程上下文。

返回

0:如果成功获取锁;-EINTR:如果收到致命信号。

void mutex_lock_io(struct mutex *lock)

获取互斥锁并将进程标记为等待 I/O

参数

struct mutex *lock

要获取的互斥锁。

描述

mutex_lock() 一样锁定互斥锁。当任务等待此互斥锁时,它将被调度器视为处于 IO 等待状态。

上下文

进程上下文。

int mutex_trylock(struct mutex *lock)

尝试获取互斥锁,无需等待

参数

struct mutex *lock

要获取的互斥锁

描述

尝试以原子方式获取互斥锁。如果成功获取互斥锁,则返回 1,如果存在争用,则返回 0。

此函数不得在中断上下文中使用。互斥锁必须由获取它的同一任务释放。

注意

此函数遵循 spin_trylock() 约定,因此它与 down_trylock() 的返回值相反!将信号量用户转换为互斥锁时,请注意这一点。

int atomic_dec_and_mutex_lock(atomic_t *cnt, struct mutex *lock)

如果递减到 0,则返回持有互斥锁

参数

atomic_t *cnt

我们要递减的原子变量

struct mutex *lock

如果我们递减到 0,则返回持有的互斥锁

描述

如果我们递减到 0,则返回 true 并持有锁,否则返回 false

Futex API 参考

void futex_hash_get(struct futex_hash_bucket *hb)

获取本地哈希的额外引用。

参数

struct futex_hash_bucket *hb

指向私有本地哈希的指针。

描述

获取已获得的哈希桶的额外引用。调用者必须已经拥有一个引用。

struct futex_hash_bucket *__futex_hash(union futex_key *key, struct futex_private_hash *fph)

返回哈希桶

参数

union futex_key *key

指向为其计算哈希的 futex 键的指针

struct futex_private_hash *fph

指向私有哈希(如果已知)的指针

描述

我们对从 get_futex_key 返回的键进行哈希(见下文),并返回相应的哈希桶。如果 FUTEX 是 PROCESS_PRIVATE,则返回每个进程的哈希桶(来自私有哈希),如果存在。否则,返回来自全局哈希的哈希桶。

struct hrtimer_sleeper *futex_setup_timer(ktime_t *time, struct hrtimer_sleeper *timeout, int flags, u64 range_ns)

设置睡眠 hrtimer。

参数

ktime_t *time

指向给定超时值的指针

struct hrtimer_sleeper *timeout

要设置的 hrtimer_sleeper 结构

int flags

futex 标志

u64 range_ns

可选范围,单位为纳秒

返回

已初始化的 hrtimer_sleeper 结构,如果没有超时,则为 NULL

给出值

int get_futex_key(u32 __user *uaddr, unsigned int flags, union futex_key *key, enum futex_access rw)

获取作为 futex 键的参数

参数

u32 __user *uaddr

futex 的虚拟地址

unsigned int flags

FLAGS_*

union futex_key *key

结果存储的地址。

enum futex_access rw

映射需要读/写(值:FUTEX_READ,FUTEX_WRITE)

返回

负错误代码或 0

描述

关键字存储在 key 中,如果成功。

对于共享映射(当 fshared 时),键是

( inode->i_sequence, 映射中的页面偏移量,页面中的偏移量 )

[ 另请参见 get_inode_sequence_number() ]

对于私有映射(或当 !fshared 时),键是

( current->mm, 地址, 0 )

这允许(在适用的情况下,跨进程)识别 futex,而无需在 FUTEX_WAIT 期间保持页面固定。

lock_page() 可能会休眠,调用者不应持有自旋锁。

int fault_in_user_writeable(u32 __user *uaddr)

调入用户地址并验证 RW 访问

参数

u32 __user *uaddr

指向故障用户空间地址的指针

描述

慢速路径,用于修复我们在原子写入访问 uaddr 时发生的故障。

我们没有对用户地址进行非破坏性写入的通用实现。我们知道我们在禁用原子页面错误的部分中发生了故障,因此我们也可以通过立即调用 get_user_pages() 来避免 #PF 开销。

struct futex_q *futex_top_waiter(struct futex_hash_bucket *hb, union futex_key *key)

返回 futex 上优先级最高的等待者

参数

struct futex_hash_bucket *hb

futex_q 所在的哈希桶

union futex_key *key

futex 键(用于将其与其他 futex futex_q 区分开)

描述

必须在持有 hb 锁的情况下调用。

void wait_for_owner_exiting(int ret, struct task_struct *exiting)

阻塞直到所有者退出

参数

int ret

所有者当前的 futex 锁状态

struct task_struct *exiting

指向正在退出的任务的指针

描述

调用者必须持有 exiting 上的引用计数。

void __futex_unqueue(struct futex_q *q)

从其 futex_hash_bucket 中删除 futex_q

参数

struct futex_q *q

要取消排队的 futex_q

描述

q->lock_ptr 不得为 NULL,并且必须由调用者持有。

int futex_unqueue(struct futex_q *q)

从其 futex_hash_bucket 中删除 futex_q

参数

struct futex_q *q

要取消排队的 futex_q

描述

q->lock_ptr 不得由调用者持有。对 futex_unqueue() 的调用必须与之前对 futex_queue() 的一次调用配对。

返回

  • 1 - 如果 futex_q 仍在排队中(并且我们取消了它的排队);

  • 0 - 如果 futex_q 已被唤醒线程删除

void futex_exit_recursive(struct task_struct *tsk)

将任务的 futex 状态设置为 FUTEX_STATE_DEAD

参数

struct task_struct *tsk

要在其上设置状态的任务

描述

以无锁方式设置任务的 futex 退出状态。当任务正在退出时,futex 等待者代码会观察到该状态,并循环直到任务实际完成 futex 清理。最坏的情况是,等待者运行通过等待循环,直到状态变得可见。

这是从 make_task_dead() 中的递归故障处理路径调用的。

这是尽力而为的。要么 futex 退出代码已经运行,要么没有运行。如果在 futex 上设置了 OWNER_DIED 位,则等待者可以接管它。如果不是,则问题被推回用户空间。如果 futex 退出代码尚未运行,则已经排队的等待者可能会永远阻塞,但对此无能为力。

struct futex_q

哈希的 futex 队列条目,每个等待任务一个

定义:

struct futex_q {
    struct plist_node list;
    struct task_struct *task;
    spinlock_t *lock_ptr;
    futex_wake_fn *wake;
    void *wake_data;
    union futex_key key;
    struct futex_pi_state *pi_state;
    struct rt_mutex_waiter *rt_waiter;
    union futex_key *requeue_pi_key;
    u32 bitset;
    atomic_t requeue_state;
    bool drop_hb_ref;
#ifdef CONFIG_PREEMPT_RT;
    struct rcuwait requeue_wait;
#endif;
};

成员

list

等待此 futex 的任务的优先级排序列表

task

等待 futex 的任务

lock_ptr

哈希桶锁

wake

此队列的唤醒处理程序

wake_data

与唤醒处理程序关联的数据

key

futex 哈希到的键

pi_state

可选的优先级继承状态

rt_waiter

用于 requeue_pi 的 rt_waiter 存储

requeue_pi_key

requeue_pi 目标 futex 键

bitset

用于可选的位掩码唤醒的位集

requeue_state

futex_requeue_pi() 的状态字段

drop_hb_ref

如果为 true,则等待者应删除额外的哈希桶引用

requeue_wait

futex_requeue_pi() 的 RCU 等待(仅 RT)

描述

我们使用此哈希等待队列,而不是普通的 wait_queue_entry_t,因此我们只能唤醒相关的等待队列(哈希队列可能是共享的)。

futex_q 具有唤醒状态,就像任务具有 TASK_RUNNING 一样。当 plist_node_empty(q->list) || q->lock_ptr == 0 时,它被认为是唤醒的。唤醒的顺序始终是使第一个条件为真,然后是第二个条件。

PI futex 通常在通过 rt_mutex 代码从哈希列表中删除之前被唤醒。请参阅 futex_unqueue_pi()。

int futex_match(union futex_key *key1, union futex_key *key2)

检查两个 futex 键是否相等

参数

union futex_key *key1

指向 key1 的指针

union futex_key *key2

指向 key2 的指针

描述

如果两个 futex_key 相等,则返回 1,否则返回 0。

void futex_queue(struct futex_q *q, struct futex_hash_bucket *hb, struct task_struct *task)

在 futex_hash_bucket 上对 futex_q 进行排队

参数

struct futex_q *q

要排队的 futex_q

struct futex_hash_bucket *hb

目标哈希桶

struct task_struct *task

为此 futex 排队的任务

描述

hb->lock 必须由调用者持有,并在此处释放。对 futex_queue() 的调用通常与对 futex_unqueue() 的一次调用配对。例外情况包括 PI 相关操作,这些操作可能使用 futex_unqueue_pi(),或者如果在唤醒过程中完成取消排队,并且取消排队状态隐含在已唤醒任务的状态中,则不使用任何操作(有关示例,请参阅 futex_wait_requeue_pi())。

请注意,对于 futex 的异步使用,task 可以为 NULL。

struct futex_vector

futex_waitv() 的辅助结构

定义:

struct futex_vector {
    struct futex_waitv w;
    struct futex_q q;
};

成员

w

用户空间提供的数据

q

内核端数据

描述

用于构建包含 futex_waitv() 所需的所有数据的数组的结构

int futex_lock_pi_atomic(u32 __user *uaddr, struct futex_hash_bucket *hb, union futex_key *key, struct futex_pi_state **ps, struct task_struct *task, struct task_struct **exiting, int set_waiters)

获取支持 PI 的 futex 所需的原子操作

参数

u32 __user *uaddr

PI futex 的用户地址

struct futex_hash_bucket *hb

PI futex 哈希桶

union futex_key *key

与 uaddr 和 hb 关联的 futex 键

struct futex_pi_state **ps

pi_state 指针,用于存储查找结果

struct task_struct *task

执行原子锁操作的任务。除了 requeue pi 的情况外,通常是 "current"。

struct task_struct **exiting

用于存储正在退出的所有者任务的任务指针的指针

int set_waiters

强制设置 FUTEX_WAITERS 位 (1) 或不设置 (0)

返回

  • 0 - 准备好等待;

  • 1 - 获取了锁;

  • <0 - 错误

描述

hb->lock 必须由调用者持有。

只有当返回值是 -EBUSY 时,才设置 **exiting**。如果设置了,则在返回时持有退出任务的引用计数,调用者需要在等待退出完成后释放该引用计数。

int fixup_pi_owner(u32 __user *uaddr, struct futex_q *q, int locked)

锁定后的 pi_state 和特殊情况处理

参数

u32 __user *uaddr

futex 的用户地址

struct futex_q *q

futex_q (包含 pi_state 和对 rt_mutex 的访问)

int locked

尝试获取 rt_mutex 是否成功 (1) 或不成功 (0)

描述

在尝试锁定 rt_mutex 之后,调用此函数来清理 pi_state 所有者,并处理可能允许我们获取锁的竞争条件。必须在持有 hb 锁的情况下调用。

返回

  • 1 - 成功,获取了锁;

  • 0 - 成功,未获取锁;

  • <0 - 发生错误 (-EFAULT)

void requeue_futex(struct futex_q *q, struct futex_hash_bucket *hb1, struct futex_hash_bucket *hb2, union futex_key *key2)

将 futex_q 从一个 hb 重新排队到另一个 hb

参数

struct futex_q *q

要重新排队的 futex_q

struct futex_hash_bucket *hb1

源 hash_bucket

struct futex_hash_bucket *hb2

目标 hash_bucket

union futex_key *key2

重新排队的 futex_q 的新键

void requeue_pi_wake_futex(struct futex_q *q, union futex_key *key, struct futex_hash_bucket *hb)

唤醒在重新排队期间获取锁的任务

参数

struct futex_q *q

futex_q

union futex_key *key

重新排队目标 futex 的键

struct futex_hash_bucket *hb

重新排队目标 futex 的 hash_bucket

描述

在 futex_requeue 期间,当 requeue_pi=1 时,如果目标 futex 没有竞争或通过锁窃取,则可以获取该目标 futex。

  1. 将 **q**::key 设置为重新排队目标 futex 键,以便等待者可以检测到在正确 futex 上的唤醒。

  2. 从哈希桶中取消排队 **q**。

  3. 将 **q**::rt_waiter 设置为 NULL,以便唤醒的任务可以检测到原子锁的获取。

  4. 将 q->lock_ptr 设置为重新排队目标 hb->lock,以防等待者必须修复 PI 状态。

  5. 完成重新排队状态,以便等待者可以取得进展。在此之后,如果不需要修复 PI 状态,等待者任务可以立即从系统调用返回。

  6. 唤醒等待者任务。

必须在同时持有 q->lock_ptr 和 hb->lock 的情况下调用。

int futex_proxy_trylock_atomic(u32 __user *pifutex, struct futex_hash_bucket *hb1, struct futex_hash_bucket *hb2, union futex_key *key1, union futex_key *key2, struct futex_pi_state **ps, struct task_struct **exiting, int set_waiters)

尝试为顶层等待者进行原子锁操作

参数

u32 __user *pifutex

要操作的 futex 的用户地址

struct futex_hash_bucket *hb1

来源 futex 哈希桶,必须由调用者锁定

struct futex_hash_bucket *hb2

目标 futex 哈希桶,必须由调用者锁定

union futex_key *key1

来源 futex 键

union futex_key *key2

目标 futex 键

struct futex_pi_state **ps

用于存储 pi_state 指针的地址

struct task_struct **exiting

用于存储正在退出的所有者任务的任务指针的指针

int set_waiters

强制设置 FUTEX_WAITERS 位 (1) 或不设置 (0)

描述

如果可以原子地完成,则尝试代表顶层等待者获取锁。 如果成功,则唤醒顶层等待者。 如果调用者指定了 set_waiters,则指示 futex_lock_pi_atomic() 强制设置 FUTEX_WAITERS 位。 hb1 和 hb2 必须由调用者持有。

只有当返回值是 -EBUSY 时,才设置 **exiting**。如果设置了,则在返回时持有退出任务的引用计数,调用者需要在等待退出完成后释放该引用计数。

返回

  • 0 - 原子获取锁失败;

  • >0 - 获取了锁,返回值是顶层等待者的 vpid

  • <0 - 错误

int futex_requeue(u32 __user *uaddr1, unsigned int flags1, u32 __user *uaddr2, unsigned int flags2, int nr_wake, int nr_requeue, u32 *cmpval, int requeue_pi)

将等待者从 uaddr1 重新排队到 uaddr2

参数

u32 __user *uaddr1

源 futex 用户地址

unsigned int flags1

futex 标志 (FLAGS_SHARED, 等等)

u32 __user *uaddr2

目标 futex 用户地址

unsigned int flags2

futex 标志 (FLAGS_SHARED, 等等)

int nr_wake

要唤醒的等待者数量(对于 requeue_pi 必须为 1)

int nr_requeue

要重新排队的等待者数量 (0-INT_MAX)

u32 *cmpval

**uaddr1** 期望的值 (或 NULL)

int requeue_pi

如果我们试图从非 PI futex 重新排队到 PI futex(不支持 PI 到 PI 的重新排队)

描述

将 uaddr1 上的等待者重新排队到 uaddr2。在 requeue_pi 的情况下,尝试代表顶层等待者原子地获取 uaddr2。

返回

  • >=0 - 成功时,重新排队或唤醒的任务数;

  • <0 - 出错

int handle_early_requeue_pi_wakeup(struct futex_hash_bucket *hb, struct futex_q *q, struct hrtimer_sleeper *timeout)

处理初始 futex 上的提前唤醒

参数

struct futex_hash_bucket *hb

futex_q 最初排队的 hash_bucket

struct futex_q *q

在等待重新排队时唤醒的 futex_q

struct hrtimer_sleeper *timeout

与等待关联的超时(如果没有,则为 NULL)

描述

确定提前唤醒的原因。

返回

-EWOULDBLOCK 或 -ETIMEDOUT 或 -ERESTARTNOINTR

int futex_wait_requeue_pi(u32 __user *uaddr, unsigned int flags, u32 val, ktime_t *abs_time, u32 bitset, u32 __user *uaddr2)

在 uaddr 上等待并获取 uaddr2

参数

u32 __user *uaddr

我们最初等待的 futex(非 PI)

unsigned int flags

futex 标志(FLAGS_SHARED、FLAGS_CLOCKRT 等),它们必须是相同的类型,不能从私有重新排队到共享等等。

u32 val

uaddr 的期望值

ktime_t *abs_time

绝对超时

u32 bitset

由用户空间设置的 32 位唤醒位集,默认为全部

u32 __user *uaddr2

在返回用户空间之前我们将要获取的 PI futex

描述

调用者将在 uaddr 上等待,并由 futex_requeue() 重新排队到 uaddr2,uaddr2 必须是 PI 感知的且与 uaddr 不同。 普通唤醒将在 uaddr2 上唤醒,并在返回用户空间之前完成 rt_mutex 的获取。 这确保了 rt_mutex 在有等待者时保持所有者; 如果没有所有者,PI 逻辑将不知道要提升/降低哪个任务(如果需要)。

当我们在排队时调用 futex_wait_queue() 中的 schedule,并通过以下方式返回:1) 在 futex_requeue() 进行原子锁获取后在 uaddr2 上唤醒 2) 重新排队后在 uaddr2 上唤醒 3) 信号 4) 超时

如果 3,则清理并返回 -ERESTARTNOINTR。

如果 2,我们可能会阻止尝试获取 rt_mutex 并通过以下方式返回:5) 成功锁定 6) 信号 7) 超时 8) 其他锁定获取失败

如果 6,则返回 -EWOULDBLOCK(重新启动系统调用会执行相同的操作)。

如果 4 或 7,我们清理并返回 -ETIMEDOUT。

返回

  • 0 - 成功;

  • <0 - 出错

void futex_do_wait(struct futex_q *q, struct hrtimer_sleeper *timeout)

等待唤醒、超时或信号

参数

struct futex_q *q

要在其上排队的 futex_q

struct hrtimer_sleeper *timeout

准备好的 hrtimer_sleeper,或者如果没有超时则为 null

int futex_unqueue_multiple(struct futex_vector *v, int count)

从它们的哈希桶中移除各种 futex

参数

struct futex_vector *v

要取消排队的 futex 列表

int count

列表中的 futex 数量

描述

用于取消排队 futex 列表的辅助函数。 这不会失败。

返回

  • >=0 - 最后一个被唤醒的 futex 的索引;

  • -1
    • 没有 futex 被唤醒

int futex_wait_multiple_setup(struct futex_vector *vs, int count, int *woken)

准备等待和排队多个 futex

参数

struct futex_vector *vs

要等待的 futex 列表

int count

列表的大小

int *woken

最后一个唤醒的 futex 的索引(如果有)。 用于通知调用者,它可以将此索引返回给用户空间(返回参数)

描述

在一个步骤中准备多个 futex 并将它们排队。 如果 futex 列表无效或者任何 futex 已经被唤醒,这可能会失败。 成功后,任务准备好进行可中断睡眠。

返回

  • 1 - 其中一个 futex 被另一个线程唤醒

  • 0 - 成功

  • <0 - -EFAULT, -EWOULDBLOCK 或 -EINVAL

void futex_sleep_multiple(struct futex_vector *vs, unsigned int count, struct hrtimer_sleeper *to)

检查睡眠条件并睡眠

参数

struct futex_vector *vs

要等待的互斥锁列表

unsigned int count

vs 的长度

struct hrtimer_sleeper *to

超时

描述

当且仅当超时未过期且列表中的互斥锁均未被唤醒时,才进入睡眠状态。

int futex_wait_multiple(struct futex_vector *vs, unsigned int count, struct hrtimer_sleeper *to)

准备等待并入队多个互斥锁

参数

struct futex_vector *vs

要等待的互斥锁列表

unsigned int count

对象的数量

struct hrtimer_sleeper *to

放弃并返回用户空间之前的超时时间

描述

FUTEX_WAIT_MULTIPLE 互斥锁操作的入口点,此函数在一个互斥锁组上睡眠,并在第一个互斥锁被唤醒时或超时时间过后返回。

返回

  • >=0 - 唤醒的互斥锁的提示

  • <0 - 出错

int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags, struct futex_q *q, union futex_key *key2, struct task_struct *task)

准备等待一个互斥锁

参数

u32 __user *uaddr

互斥锁用户空间地址

u32 val

期望值

unsigned int flags

futex 标志 (FLAGS_SHARED, 等等)

struct futex_q *q

关联的 futex_q

union futex_key *key2

如果用于 requeue PI,则为第二个 futex_key

struct task_struct *task

为此 futex 排队的任务

描述

设置 futex_q 并定位 hash_bucket。 获取互斥锁值并将其与期望值进行比较。 在内部处理原子错误。 成功时保持 hb 锁,失败时解锁返回。

返回

  • 0 - uaddr 包含 val 并且 hb 已被锁定;

  • <0 - 出现错误,并且 hb 已解锁。 可能的原因:uaddr 无法

    被读取,不包含期望值或未正确对齐。

进一步阅读

  • Documentation/locking/spinlocks.rst:Linus Torvalds 在内核源代码中的自旋锁教程。

  • 现代架构的 Unix 系统:内核程序员的对称多处理和缓存

    Curt Schimmel 对内核级别锁定的非常好的介绍(不是为 Linux 编写的,但几乎所有内容都适用)。 这本书很贵,但对于理解 SMP 锁定来说,每一分钱都值得。 [ISBN: 0201633388]

感谢

感谢 Telsa Gwynne 进行 DocBooking、整理和添加样式。

感谢 Martin Pool、Philipp Rumpf、Stephen Rothwell、Paul Mackerras、Ruedi Aschwanden、Alan Cox、Manfred Spraul、Tim Waugh、Pete Zaitcev、James Morris、Robert Love、Paul McKenney、John Ashby 进行校对、更正、批评和评论。

感谢该联盟对本文档没有任何影响。

词汇表

抢占

在 2.5 之前,或当 CONFIG_PREEMPT 未设置时,内核中用户上下文中的进程不会相互抢占(即,除非您放弃 CPU,否则您将拥有它,除非发生中断)。 随着 2.5.4 中添加了 CONFIG_PREEMPT,这种情况发生了变化:当处于用户上下文时,优先级更高的任务可以“切入”:自旋锁已更改为禁用抢占,即使在 UP 上也是如此。

bh

Bottom Half:由于历史原因,其中包含“_bh”的函数现在通常指任何软件中断,例如 spin_lock_bh() 会阻止当前 CPU 上的任何软件中断。 Bottom halves 已被弃用,最终将被 tasklets 替换。 任何时候只有一个 bottom half 将运行。

硬件中断 / 硬件 IRQ

硬件中断请求。 in_hardirq() 在硬件中断处理程序中返回 true。

中断上下文

不是用户上下文:正在处理硬件 irq 或软件 irq。 由返回 true 的 in_interrupt() 宏指示。

SMP

对称多处理器:为多 CPU 机器编译的内核。 (CONFIG_SMP=y)。

软件中断 / softirq

软件中断处理程序。 in_hardirq() 返回 false; in_softirq() 返回 true。 Tasklets 和 softirqs 都属于“软件中断”类别。

严格来说,softirq 是最多 32 个枚举的软件中断之一,可以一次在多个 CPU 上运行。 有时也用于指 tasklets(即所有软件中断)。

tasklet

一个可以动态注册的软件中断,保证一次只能在一个 CPU 上运行。

timer

一个可以动态注册的软件中断,在给定的时间(或接近给定的时间)运行。 运行时,它就像一个 tasklet(事实上,它们是从 TIMER_SOFTIRQ 调用的)。

UP

单处理器:非 SMP。 (CONFIG_SMP=n)。

用户上下文

内核代表特定进程(即系统调用或陷阱)或内核线程执行。 您可以使用 current 宏来判断哪个进程。) 不要与用户空间混淆。 可以被软件或硬件中断中断。

用户空间

一个在其自己的代码中在内核外部执行的进程。