英语

不可靠的锁指南

作者:

Rusty Russell

简介

欢迎来到 Rusty 的极其不可靠的内核锁问题指南。本文档介绍了 Linux 内核 2.6 中的锁定系统。

随着超线程的广泛应用以及 Linux 内核中的抢占,每个在内核上进行黑客工作的人都需要了解 SMP 的并发和锁定的基本原理。

并发问题

(如果您知道什么是竞争条件,请跳过此部分)。

在普通程序中,您可以像这样增加计数器

very_important_count++;

这是他们期望发生的事情

预期结果

实例 1

实例 2

读取 very_important_count (5)

加 1 (6)

写入 very_important_count (6)

读取 very_important_count (6)

加 1 (7)

写入 very_important_count (7)

这可能发生的情况

可能的结果

实例 1

实例 2

读取 very_important_count (5)

读取 very_important_count (5)

加 1 (6)

加 1 (6)

写入 very_important_count (6)

写入 very_important_count (6)

竞争条件和临界区

这种重叠,其中结果取决于多个任务的相对时序,称为竞争条件。包含并发问题的代码片段称为临界区。特别是因为 Linux 开始在 SMP 机器上运行,它们成为内核设计和实现中的主要问题之一。

抢占可能会产生相同的效果,即使只有一个 CPU:通过在临界区期间抢占一个任务,我们会遇到完全相同的竞争条件。在这种情况下,抢占的线程可能会自行运行临界区。

解决方案是识别何时发生这些同时访问,并使用锁来确保在任何时间只有一个实例可以进入临界区。Linux 内核中有许多友好的原语可以帮助您做到这一点。还有一些不友好的原语,但我会假装它们不存在。

Linux 内核中的锁

如果我可以给您关于锁的一条建议:保持简单

不情愿引入新的锁。

两种主要类型的内核锁:自旋锁和互斥锁

有两种主要类型的内核锁。基本类型是自旋锁 (include/asm/spinlock.h),它是一个非常简单的单持有者锁:如果您无法获取自旋锁,您会一直尝试(自旋),直到可以为止。自旋锁非常小且快速,可以在任何地方使用。

第二种类型是互斥锁 (include/linux/mutex.h):它类似于自旋锁,但是您可能会阻塞持有互斥锁。如果您无法锁定互斥锁,您的任务将会挂起自身,并在互斥锁释放时被唤醒。这意味着 CPU 在您等待时可以执行其他操作。在许多情况下,您根本无法睡眠(请参见哪些函数可以安全地从中断中调用?),因此必须改用自旋锁。

这两种类型的锁都不是递归的:请参见死锁:简单和高级

锁和单处理器内核

对于在没有 CONFIG_SMP 且没有 CONFIG_PREEMPT 的情况下编译的内核,自旋锁根本不存在。这是一个出色的设计决策:当没有其他人可以同时运行时,没有理由使用锁。

如果内核在没有 CONFIG_SMP 的情况下编译,但是设置了 CONFIG_PREEMPT,则自旋锁只会禁用抢占,这足以防止任何竞争。对于大多数用途,我们可以将抢占视为等同于 SMP,而无需单独担心它。

您应该始终在启用 CONFIG_SMPCONFIG_PREEMPT 的情况下测试您的锁代码,即使您没有 SMP 测试盒,因为它仍然会捕获某些类型的锁错误。

互斥锁仍然存在,因为它们是用户上下文之间同步所必需的,正如我们将在下面看到的那样。

仅在用户上下文中锁定

如果您有一个仅从用户上下文访问的数据结构,那么您可以使用简单的互斥锁 (include/linux/mutex.h) 来保护它。这是最简单的情况:您初始化互斥锁。然后,您可以调用 mutex_lock_interruptible() 来获取互斥锁,并调用 mutex_unlock() 来释放它。还有一个 mutex_lock(),应该避免使用,因为它在收到信号时不会返回。

示例:net/netfilter/nf_sockopt.c 允许注册新的 setsockopt() 和 getsockopt() 调用,使用 nf_register_sockopt()。注册和注销仅在模块加载和卸载时完成(以及启动时,没有并发),并且仅针对未知的 setsockopt() 或 getsockopt() 系统调用查询注册列表。nf_sockopt_mutex 非常适合保护这一点,特别是由于 setsockopt 和 getsockopt 调用很可能会睡眠。

用户上下文和软中断之间的锁定

如果软中断与用户上下文共享数据,您将有两个问题。首先,当前用户上下文可能会被软中断中断,其次,临界区可能会从另一个 CPU 进入。这就是使用 spin_lock_bh() (include/linux/spinlock.h) 的地方。它禁用该 CPU 上的软中断,然后获取锁。spin_unlock_bh() 则相反。(后缀“_bh”是对“Bottom Halves”的历史引用,这是软件中断的旧名称。在理想的世界中,它应该被称为 spin_lock_softirq())。

请注意,您也可以在此处使用 spin_lock_irq() 或 spin_lock_irqsave(),它们也会停止硬件中断:请参见硬中断上下文

这也适用于 UP:自旋锁消失,并且此宏简单地变为 local_bh_disable() (include/linux/interrupt.h),它可以保护您免于运行软中断。

用户上下文和任务レット之间的锁定

这与上述完全相同,因为任务レット实际上是从软中断运行的。

用户上下文和定时器之间的锁定

这也与上述完全相同,因为定时器实际上是从软中断运行的。从锁定的角度来看,任务렛和定时器是相同的。

任务렛/定时器之间的锁定

有时,任务렛或定时器可能想要与另一个任务렛或定时器共享数据。

相同的任务렛/定时器

由于任务렛永远不会在两个 CPU 上同时运行,因此您无需担心您的任务렛是可重入的(同时运行两次),即使在 SMP 上也是如此。

不同的任务렛/定时器

如果另一个任务렛/定时器想要与您的任务렛或定时器共享数据,您都需要使用 spin_lock() 和 spin_unlock() 调用。此处不需要 spin_lock_bh(),因为您已经在任务렛中,并且不会在同一 CPU 上运行任何任务렛。

软中断之间的锁定

通常,软中断可能想要与自身或任务レット/定时器共享数据。

相同的软中断

相同的软中断可以在其他 CPU 上运行:您可以使用每个 CPU 的数组(请参见每个 CPU 的数据)来获得更好的性能。如果您要使用软中断,您可能足够关心可扩展的性能,从而有理由增加额外的复杂性。

您需要对共享数据使用 spin_lock() 和 spin_unlock()。

不同的软中断

您需要对共享数据使用 spin_lock() 和 spin_unlock(),无论它是定时器、任务렛、不同的软中断还是相同或另一个软中断:它们中的任何一个都可能在不同的 CPU 上运行。

硬中断上下文

硬件中断通常与 tasklet 或软中断进行通信。通常,这涉及到将工作放入队列,然后由软中断取出。

硬中断和软中断/Tasklet 之间的锁

如果硬件中断处理程序与软中断共享数据,您需要考虑两个问题。首先,软中断处理可能会被硬件中断打断;其次,关键区域可能会被另一个 CPU 上的硬件中断进入。这就是 `spin_lock_irq()` 的用武之地。它被定义为禁用该 CPU 上的中断,然后获取锁。`spin_unlock_irq()` 则执行相反的操作。

中断处理程序不需要使用 `spin_lock_irq()`,因为软中断在中断处理程序运行时无法运行:它可以 使用 `spin_lock()`,后者速度稍快。唯一的例外是,如果不同的硬件中断处理程序使用相同的锁:`spin_lock_irq()` 将阻止它中断我们。

这对于 UP(单处理器)也同样有效:自旋锁会消失,并且此宏直接变为 `local_irq_disable()` (`include/asm/smp.h`),它可以保护您免受软中断/tasklet/BH 的运行。

`spin_lock_irqsave()`(`include/linux/spinlock.h`)是一个变体,它将中断的开启或关闭状态保存在一个标志字中,该标志字会传递给 `spin_unlock_irqrestore()`。这意味着相同的代码可以在硬中断处理程序(其中中断已经关闭)和软中断(其中需要禁用中断)中使用。

请注意,软中断(以及 tasklet 和定时器)是在从硬件中断返回时运行的,因此 `spin_lock_irq()` 也会阻止这些。从这个意义上说,`spin_lock_irqsave()` 是最通用和强大的锁定函数。

两个硬中断处理程序之间的锁

在两个中断处理程序之间共享数据的情况很少见,但如果确实需要这样做,则应使用 `spin_lock_irqsave()`:是否在中断处理程序内部禁用所有中断是特定于体系结构的。

锁定速查表

Pete Zaitcev 给出了以下总结:

  • 如果您在进程上下文中(任何系统调用),并且想将其他进程锁定在外,请使用互斥锁。您可以获取互斥锁并休眠(`copy_from_user()` 或 `kmalloc(x,GFP_KERNEL)`)。

  • 否则(== 数据可以在中断中被访问),请使用 `spin_lock_irqsave()` 和 `spin_unlock_irqrestore()`。

  • 避免在超过 5 行代码或任何函数调用(除了诸如 `readb()` 之类的访问器)中持有自旋锁。

最低要求表

下表列出了各种上下文之间的 **最低** 锁定要求。在某些情况下,相同的上下文一次只能在一个 CPU 上运行,因此该上下文不需要锁定(例如,特定的线程一次只能在一个 CPU 上运行,但如果它需要与另一个线程共享数据,则需要锁定)。

请记住上面的建议:您始终可以使用 `spin_lock_irqsave()`,它是所有其他自旋锁原语的超集。

.

中断处理程序 A

中断处理程序 B

软中断 A

软中断 B

Tasklet A

Tasklet B

定时器 A

定时器 B

用户上下文 A

用户上下文 B

中断处理程序 A

中断处理程序 B

SLIS

软中断 A

SLI

SLI

SL

软中断 B

SLI

SLI

SL

SL

Tasklet A

SLI

SLI

SL

SL

Tasklet B

SLI

SLI

SL

SL

SL

定时器 A

SLI

SLI

SL

SL

SL

SL

定时器 B

SLI

SLI

SL

SL

SL

SL

SL

用户上下文 A

SLI

SLI

SLBH

SLBH

SLBH

SLBH

SLBH

SLBH

用户上下文 B

SLI

SLI

SLBH

SLBH

SLBH

SLBH

SLBH

SLBH

MLI

表:锁定要求表

SLIS

spin_lock_irqsave

SLI

spin_lock_irq

SL

spin_lock

SLBH

spin_lock_bh

MLI

mutex_lock_interruptible

表:锁定要求表图例

trylock 函数

有一些函数会尝试仅获取一次锁,并立即返回一个值,指示是否成功获取锁。当某些其他线程持有锁时,如果您不需要访问受锁保护的数据,则可以使用它们。如果之后您需要访问受锁保护的数据,则应稍后获取锁。

`spin_trylock()` 不会自旋,如果它在第一次尝试时获取了自旋锁,则返回非零值,否则返回 0。此函数可以在所有上下文中使用,类似于 `spin_lock()`:您必须禁用可能中断您并获取自旋锁的上下文。

mutex_trylock() 不会暂停您的任务,但如果它在第一次尝试时可以锁定互斥锁,则返回非零值,否则返回 0。尽管此函数不会休眠,但它不能在硬件或软件中断上下文中安全使用。

常见示例

让我们逐步了解一个简单的示例:数字到名称映射的缓存。缓存会跟踪每个对象的使用频率,并在缓存已满时,丢弃最少使用的对象。

全部在用户上下文中

在我们的第一个示例中,我们假设所有操作都在用户上下文中(即来自系统调用),因此我们可以休眠。这意味着我们可以使用互斥锁来保护缓存及其中的所有对象。这是代码:

#include <linux/list.h>
#include <linux/slab.h>
#include <linux/string.h>
#include <linux/mutex.h>
#include <asm/errno.h>

struct object
{
        struct list_head list;
        int id;
        char name[32];
        int popularity;
};

/* Protects the cache, cache_num, and the objects within it */
static DEFINE_MUTEX(cache_lock);
static LIST_HEAD(cache);
static unsigned int cache_num = 0;
#define MAX_CACHE_SIZE 10

/* Must be holding cache_lock */
static struct object *__cache_find(int id)
{
        struct object *i;

        list_for_each_entry(i, &cache, list)
                if (i->id == id) {
                        i->popularity++;
                        return i;
                }
        return NULL;
}

/* Must be holding cache_lock */
static void __cache_delete(struct object *obj)
{
        BUG_ON(!obj);
        list_del(&obj->list);
        kfree(obj);
        cache_num--;
}

/* Must be holding cache_lock */
static void __cache_add(struct object *obj)
{
        list_add(&obj->list, &cache);
        if (++cache_num > MAX_CACHE_SIZE) {
                struct object *i, *outcast = NULL;
                list_for_each_entry(i, &cache, list) {
                        if (!outcast || i->popularity < outcast->popularity)
                                outcast = i;
                }
                __cache_delete(outcast);
        }
}

int cache_add(int id, const char *name)
{
        struct object *obj;

        if ((obj = kmalloc(sizeof(*obj), GFP_KERNEL)) == NULL)
                return -ENOMEM;

        strscpy(obj->name, name, sizeof(obj->name));
        obj->id = id;
        obj->popularity = 0;

        mutex_lock(&cache_lock);
        __cache_add(obj);
        mutex_unlock(&cache_lock);
        return 0;
}

void cache_delete(int id)
{
        mutex_lock(&cache_lock);
        __cache_delete(__cache_find(id));
        mutex_unlock(&cache_lock);
}

int cache_find(int id, char *name)
{
        struct object *obj;
        int ret = -ENOENT;

        mutex_lock(&cache_lock);
        obj = __cache_find(id);
        if (obj) {
                ret = 0;
                strcpy(name, obj->name);
        }
        mutex_unlock(&cache_lock);
        return ret;
}

请注意,当添加、删除或查找缓存时,我们始终确保拥有 `cache_lock`:缓存的基础结构本身和对象的内容都受到锁的保护。在这种情况下,这很容易,因为我们复制用户的数据,并且永远不会让他们直接访问对象。

这里有一个轻微的(和常见的)优化:在 `cache_add()` 中,我们在获取锁之前设置对象的字段。这是安全的,因为在我们将其放入缓存之前,没有人可以访问它。

从中断上下文访问

现在考虑可以从中断上下文调用 `cache_find()` 的情况:无论是硬件中断还是软中断。例如,定时器会删除缓存中的对象。

更改如下所示,采用标准补丁格式:`-` 是删除的行,而 `+` 是添加的行。

--- cache.c.usercontext 2003-12-09 13:58:54.000000000 +1100
+++ cache.c.interrupt   2003-12-09 14:07:49.000000000 +1100
@@ -12,7 +12,7 @@
         int popularity;
 };

-static DEFINE_MUTEX(cache_lock);
+static DEFINE_SPINLOCK(cache_lock);
 static LIST_HEAD(cache);
 static unsigned int cache_num = 0;
 #define MAX_CACHE_SIZE 10
@@ -55,6 +55,7 @@
 int cache_add(int id, const char *name)
 {
         struct object *obj;
+        unsigned long flags;

         if ((obj = kmalloc(sizeof(*obj), GFP_KERNEL)) == NULL)
                 return -ENOMEM;
@@ -63,30 +64,33 @@
         obj->id = id;
         obj->popularity = 0;

-        mutex_lock(&cache_lock);
+        spin_lock_irqsave(&cache_lock, flags);
         __cache_add(obj);
-        mutex_unlock(&cache_lock);
+        spin_unlock_irqrestore(&cache_lock, flags);
         return 0;
 }

 void cache_delete(int id)
 {
-        mutex_lock(&cache_lock);
+        unsigned long flags;
+
+        spin_lock_irqsave(&cache_lock, flags);
         __cache_delete(__cache_find(id));
-        mutex_unlock(&cache_lock);
+        spin_unlock_irqrestore(&cache_lock, flags);
 }

 int cache_find(int id, char *name)
 {
         struct object *obj;
         int ret = -ENOENT;
+        unsigned long flags;

-        mutex_lock(&cache_lock);
+        spin_lock_irqsave(&cache_lock, flags);
         obj = __cache_find(id);
         if (obj) {
                 ret = 0;
                 strcpy(name, obj->name);
         }
-        mutex_unlock(&cache_lock);
+        spin_unlock_irqrestore(&cache_lock, flags);
         return ret;
 }

请注意,如果中断处于开启状态,则 `spin_lock_irqsave()` 将关闭中断,否则不执行任何操作(如果我们已经在中断处理程序中),因此这些函数可以从任何上下文中安全调用。

不幸的是,`cache_add()` 使用 `GFP_KERNEL` 标志调用 kmalloc(),这仅在用户上下文中合法。我假设 `cache_add()` 仍然仅在用户上下文中调用,否则它应成为 `cache_add()` 的参数。

在此文件之外公开对象

如果我们的对象包含更多信息,则可能不足以复制进出信息:例如,代码的其他部分可能希望保留指向这些对象的指针,而不是每次都查找 ID。这会产生两个问题。

第一个问题是我们使用 `cache_lock` 来保护对象:我们需要将其设为非静态,以便代码的其余部分可以使用它。这使得锁定更加棘手,因为它不再全部集中在一个位置。

第二个问题是生命周期问题:如果另一个结构保留指向对象的指针,则它应该期望该指针保持有效。不幸的是,这仅在您持有锁时才能保证,否则有人可能会调用 `cache_delete()`,更糟糕的是,添加另一个对象,重用相同的地址。

由于只有一个锁,因此您无法永远持有它:否则没人会完成任何工作。

此问题的解决方案是使用引用计数:每个具有指向对象指针的人员在首次获取对象时都会增加引用计数,并在完成操作后删除引用计数。将引用计数降为零的人知道它未被使用,实际上可以删除它。

这是代码:

--- cache.c.interrupt   2003-12-09 14:25:43.000000000 +1100
+++ cache.c.refcnt  2003-12-09 14:33:05.000000000 +1100
@@ -7,6 +7,7 @@
 struct object
 {
         struct list_head list;
+        unsigned int refcnt;
         int id;
         char name[32];
         int popularity;
@@ -17,6 +18,35 @@
 static unsigned int cache_num = 0;
 #define MAX_CACHE_SIZE 10

+static void __object_put(struct object *obj)
+{
+        if (--obj->refcnt == 0)
+                kfree(obj);
+}
+
+static void __object_get(struct object *obj)
+{
+        obj->refcnt++;
+}
+
+void object_put(struct object *obj)
+{
+        unsigned long flags;
+
+        spin_lock_irqsave(&cache_lock, flags);
+        __object_put(obj);
+        spin_unlock_irqrestore(&cache_lock, flags);
+}
+
+void object_get(struct object *obj)
+{
+        unsigned long flags;
+
+        spin_lock_irqsave(&cache_lock, flags);
+        __object_get(obj);
+        spin_unlock_irqrestore(&cache_lock, flags);
+}
+
 /* Must be holding cache_lock */
 static struct object *__cache_find(int id)
 {
@@ -35,6 +65,7 @@
 {
         BUG_ON(!obj);
         list_del(&obj->list);
+        __object_put(obj);
         cache_num--;
 }

@@ -63,6 +94,7 @@
         strscpy(obj->name, name, sizeof(obj->name));
         obj->id = id;
         obj->popularity = 0;
+        obj->refcnt = 1; /* The cache holds a reference */

         spin_lock_irqsave(&cache_lock, flags);
         __cache_add(obj);
@@ -79,18 +111,15 @@
         spin_unlock_irqrestore(&cache_lock, flags);
 }

-int cache_find(int id, char *name)
+struct object *cache_find(int id)
 {
         struct object *obj;
-        int ret = -ENOENT;
         unsigned long flags;

         spin_lock_irqsave(&cache_lock, flags);
         obj = __cache_find(id);
-        if (obj) {
-                ret = 0;
-                strcpy(name, obj->name);
-        }
+        if (obj)
+                __object_get(obj);
         spin_unlock_irqrestore(&cache_lock, flags);
-        return ret;
+        return obj;
 }

我们将引用计数封装在标准的 “get” 和 “put” 函数中。现在我们可以从 `cache_find()` 返回对象本身,这样做的好处是用户现在可以休眠持有该对象(例如,将名称 `copy_to_user()` 到用户空间)。

另一点要注意的是,我说过每个指向对象的指针都应保留一个引用:因此,首次插入缓存时,引用计数为 1。在某些版本中,框架不保留引用计数,但它们更复杂。

对引用计数使用原子操作

实际上,`atomic_t` 通常用于 refcnt。在 `include/asm/atomic.h` 中定义了许多原子操作:保证这些原子操作在系统中所有 CPU 上都是原子可见的,因此不需要锁。在这种情况下,它比使用自旋锁更简单,尽管对于任何非平凡的事情,使用自旋锁更清晰。使用 atomic_inc()atomic_dec_and_test() 而不是标准递增和递减运算符,并且不再使用锁来保护引用计数本身。

--- cache.c.refcnt  2003-12-09 15:00:35.000000000 +1100
+++ cache.c.refcnt-atomic   2003-12-11 15:49:42.000000000 +1100
@@ -7,7 +7,7 @@
 struct object
 {
         struct list_head list;
-        unsigned int refcnt;
+        atomic_t refcnt;
         int id;
         char name[32];
         int popularity;
@@ -18,33 +18,15 @@
 static unsigned int cache_num = 0;
 #define MAX_CACHE_SIZE 10

-static void __object_put(struct object *obj)
-{
-        if (--obj->refcnt == 0)
-                kfree(obj);
-}
-
-static void __object_get(struct object *obj)
-{
-        obj->refcnt++;
-}
-
 void object_put(struct object *obj)
 {
-        unsigned long flags;
-
-        spin_lock_irqsave(&cache_lock, flags);
-        __object_put(obj);
-        spin_unlock_irqrestore(&cache_lock, flags);
+        if (atomic_dec_and_test(&obj->refcnt))
+                kfree(obj);
 }

 void object_get(struct object *obj)
 {
-        unsigned long flags;
-
-        spin_lock_irqsave(&cache_lock, flags);
-        __object_get(obj);
-        spin_unlock_irqrestore(&cache_lock, flags);
+        atomic_inc(&obj->refcnt);
 }

 /* Must be holding cache_lock */
@@ -65,7 +47,7 @@
 {
         BUG_ON(!obj);
         list_del(&obj->list);
-        __object_put(obj);
+        object_put(obj);
         cache_num--;
 }

@@ -94,7 +76,7 @@
         strscpy(obj->name, name, sizeof(obj->name));
         obj->id = id;
         obj->popularity = 0;
-        obj->refcnt = 1; /* The cache holds a reference */
+        atomic_set(&obj->refcnt, 1); /* The cache holds a reference */

         spin_lock_irqsave(&cache_lock, flags);
         __cache_add(obj);
@@ -119,7 +101,7 @@
         spin_lock_irqsave(&cache_lock, flags);
         obj = __cache_find(id);
         if (obj)
-                __object_get(obj);
+                object_get(obj);
         spin_unlock_irqrestore(&cache_lock, flags);
         return obj;
 }

保护对象本身

在这些示例中,我们假设对象(引用计数除外)一旦创建就不会更改。如果我们想允许更改名称,则有三种可能性:

  • 您可以使 `cache_lock` 为非静态,并告诉人们在更改任何对象中的名称之前获取该锁。

  • 您可以提供一个 `cache_obj_rename()`,它会获取此锁并为调用者更改名称,并告诉每个人都使用该函数。

  • 您可以使 `cache_lock` 仅保护缓存本身,并使用另一个锁来保护名称。

理论上,你可以将锁的粒度细化到每个对象的每个字段都使用一个锁。但在实践中,最常见的变体有:

  • 一个锁保护基础设施(本例中的 cache 列表)和所有对象。这是我们目前所做的。

  • 一个锁保护基础设施(包括对象内部的列表指针),以及对象内部的一个锁保护该对象的其余部分。

  • 多个锁保护基础设施(例如,每个哈希链一个锁),可能还有一个单独的每个对象的锁。

以下是“每个对象一个锁”的实现:

--- cache.c.refcnt-atomic   2003-12-11 15:50:54.000000000 +1100
+++ cache.c.perobjectlock   2003-12-11 17:15:03.000000000 +1100
@@ -6,11 +6,17 @@

 struct object
 {
+        /* These two protected by cache_lock. */
         struct list_head list;
+        int popularity;
+
         atomic_t refcnt;
+
+        /* Doesn't change once created. */
         int id;
+
+        spinlock_t lock; /* Protects the name */
         char name[32];
-        int popularity;
 };

 static DEFINE_SPINLOCK(cache_lock);
@@ -77,6 +84,7 @@
         obj->id = id;
         obj->popularity = 0;
         atomic_set(&obj->refcnt, 1); /* The cache holds a reference */
+        spin_lock_init(&obj->lock);

         spin_lock_irqsave(&cache_lock, flags);
         __cache_add(obj);

请注意,我决定将受欢迎程度计数由 cache_lock 而不是每个对象的锁来保护:这是因为它(就像对象内的 struct list_head 一样)在逻辑上是基础设施的一部分。这样,在寻找最不流行的元素时,我不需要在 __cache_add() 中获取每个对象的锁。

我还决定 id 成员是不可更改的,因此我不需要在 __cache_find() 中获取每个对象锁来检查 id:对象锁仅由想要读取或写入 name 字段的调用者使用。

还要注意,我添加了注释来描述哪些数据受到哪些锁的保护。这非常重要,因为它描述了代码的运行时行为,并且很难仅通过阅读来获得。正如 Alan Cox 所说,“锁数据,而不是代码”。

常见问题

死锁:简单和高级

存在一个编码错误,其中一段代码尝试两次获取自旋锁:它会永远自旋,等待锁被释放(自旋锁、读写锁和互斥锁在 Linux 中不是递归的)。这很容易诊断:不是那种需要熬夜五晚、与毛茸茸的代码兔子交谈才能解决的问题。

对于稍微复杂的情况,假设您有一个由软中断和用户上下文共享的区域。如果您使用 spin_lock() 调用来保护它,则用户上下文可能会在持有锁时被软中断中断,然后软中断会永远自旋尝试获取同一个锁。

这两种情况都称为死锁,如上所示,即使在单 CPU 上也可能发生(尽管在 UP 编译中不会发生,因为在内核编译时 CONFIG_SMP=n 时,自旋锁会消失。在第二个示例中,您仍然会遇到数据损坏)。

这种完全锁定很容易诊断:在 SMP 机器上,看门狗定时器或使用 DEBUG_SPINLOCK 集(include/linux/spinlock.h)编译会在发生时立即显示出来。

一个更复杂的问题是所谓的“死锁拥抱”,涉及两个或多个锁。假设您有一个哈希表:表中的每个条目都是一个自旋锁,以及一个哈希对象的链。在软中断处理程序中,您有时想要将对象从哈希中的一个位置更改到另一个位置:您获取旧哈希链的自旋锁和新哈希链的自旋锁,然后从旧哈希链中删除对象,并将其插入新哈希链中。

这里有两个问题。首先,如果您的代码尝试将对象移动到同一链中,它会因为尝试两次锁定而导致自身死锁。其次,如果另一个 CPU 上的同一个软中断尝试以相反的方向移动另一个对象,则可能会发生以下情况:

CPU 1

CPU 2

获取锁 A -> 确定

获取锁 B -> 确定

获取锁 B -> 自旋

获取锁 A -> 自旋

表:后果

两个 CPU 将永远自旋,等待对方放弃他们的锁。它看起来、闻起来和感觉起来都像崩溃。

预防死锁

教科书会告诉您,如果您始终以相同的顺序锁定,则永远不会出现这种死锁。实践会告诉您,这种方法是不可扩展的:当我创建一个新锁时,我无法充分了解内核,以找出它在 5000 个锁层次结构中的位置。

最好的锁是封装的:它们永远不会在头文件中暴露,并且永远不会在同一文件之外的非平凡函数的调用周围持有。您可以阅读此代码并看到它永远不会死锁,因为它永远不会在拥有一个锁时尝试获取另一个锁。使用您代码的人甚至不需要知道您正在使用锁。

这里一个经典的问题是当您提供回调或钩子时:如果您在持有锁的情况下调用这些回调或钩子,您将面临简单的死锁或死锁拥抱的风险(谁知道回调会做什么?)。

过度预防死锁

死锁是有问题的,但不如数据损坏那么糟糕。一段代码获取读锁、搜索列表、未能找到所需内容、释放读锁、获取写锁并插入对象,会存在竞争条件。

竞争的定时器:内核的消遣

定时器可能会因竞争产生其自身特殊的问题。考虑一个对象集合(列表、哈希等),其中每个对象都有一个定时器,该定时器会将其销毁。

如果要销毁整个集合(例如,在模块移除时),您可能会执行以下操作:

/* THIS CODE BAD BAD BAD BAD: IF IT WAS ANY WORSE IT WOULD USE
   HUNGARIAN NOTATION */
spin_lock_bh(&list_lock);

while (list) {
        struct foo *next = list->next;
        timer_delete(&list->timer);
        kfree(list);
        list = next;
}

spin_unlock_bh(&list_lock);

迟早,这会在 SMP 上崩溃,因为定时器可能在 spin_lock_bh() 之前刚刚触发,并且它只会在我们 spin_unlock_bh() 之后获得锁,然后尝试释放该元素(该元素已被释放!)。

可以通过检查 timer_delete() 的结果来避免这种情况:如果它返回 1,则定时器已被删除。如果返回 0,则表示(在本例中)它当前正在运行,因此我们可以执行以下操作:

retry:
        spin_lock_bh(&list_lock);

        while (list) {
                struct foo *next = list->next;
                if (!timer_delete(&list->timer)) {
                        /* Give timer a chance to delete this */
                        spin_unlock_bh(&list_lock);
                        goto retry;
                }
                kfree(list);
                list = next;
        }

        spin_unlock_bh(&list_lock);

另一个常见的问题是删除那些会自行重启的定时器(通过在其定时器函数的末尾调用 add_timer())。由于这是一个容易发生竞争的常见情况,因此您应该使用 timer_delete_sync()include/linux/timer.h)来处理这种情况。

在释放定时器之前,应调用 timer_shutdown()timer_shutdown_sync(),这将防止它被重新启动。任何随后重新启动定时器的尝试都将被核心代码静默忽略。

锁定速度

在考虑执行锁定的代码的速度时,需要考虑三个主要因素。首先是并发:在其他人持有锁时,有多少事情会被等待。其次是实际获取和释放未争用的锁所花费的时间。第三是使用更少或更智能的锁。我假设该锁的使用频率相当高:否则,您不会关心效率。

并发取决于锁通常被持有的时间长短:您应该在需要的时间内持有锁,但不要超过此时间。在缓存示例中,我们始终在未持有锁的情况下创建对象,然后仅在准备将其插入列表时才获取锁。

获取时间取决于锁操作对管道造成多大的损害(管道停顿)以及此 CPU 上次获取锁的可能性有多大(即,该锁对于此 CPU 是否是缓存热点):在拥有更多 CPU 的机器上,这种可能性会迅速下降。以 700MHz 英特尔奔腾 III 为例:一条指令大约需要 0.7ns,原子增量大约需要 58ns,在此 CPU 上缓存热点的锁需要 160ns,而从另一个 CPU 传输缓存行需要额外 170 到 360ns。(这些数据来自 Paul McKenney 的 Linux Journal RCU 文章)。

这两个目标是冲突的:通过将锁分成几部分(例如,在我们最终的每个对象锁示例中)可以缩短持有锁的时间,但这会增加锁的获取次数,并且结果通常比使用单个锁要慢。这是提倡锁定简单性的另一个原因。

第三个问题将在下面讨论:有一些方法可以减少需要完成的锁定次数。

读/写锁变体

自旋锁和互斥锁都有读/写变体:rwlock_tstruct rw_semaphore。这些将用户分为两类:读者和写者。如果您只是读取数据,则可以获取读锁,但是要写入数据,则需要写锁。许多人可以持有读锁,但是写者必须是唯一的持有者。

如果您的代码可以整齐地划分为读者/写者行(就像我们的缓存代码一样),并且读者持有锁的时间很长,则使用这些锁可能会有所帮助。但是,它们比普通锁稍微慢一些,因此在实践中,rwlock_t 通常不值得使用。

避免锁:读取复制更新

有一种特殊的读/写锁定方法称为读取复制更新。使用 RCU,读者可以完全避免获取锁:由于我们希望缓存的读取频率高于更新频率(否则缓存就是浪费时间),因此它是这种优化的候选者。

我们如何摆脱读锁?摆脱读锁意味着写者可能会在读者下方更改列表。这实际上非常简单:如果写者非常小心地添加元素,则我们可以在添加元素时读取链表。例如,将 new 添加到名为 list 的单链表:

new->next = list->next;
wmb();
list->next = new;

wmb() 是写入内存屏障。它确保第一个操作(设置新元素的 next 指针)完成并且所有 CPU 都将看到它,然后才能进行第二个操作(将新元素放入列表中)。这很重要,因为现代编译器和现代 CPU 都可以在没有明确告知的情况下重新排序指令:我们希望读者要么根本看不到新元素,要么看到 next 指针正确指向列表其余部分的新元素。

幸运的是,有一个函数可以为标准的 struct list_head 列表执行此操作:list_add_rcu()include/linux/list.h)。

从列表中删除元素甚至更简单:我们将指向旧元素的指针替换为指向其后继元素的指针,并且读者要么看到它,要么跳过它。

list->next = old->next;

这里有 list_del_rcu()include/linux/list.h),它的作用是这样(普通版本会毒化旧对象,这不是我们想要的)。

读者还必须小心:有些 CPU 可能会查看 next 指针,提前开始读取下一个元素的内容,但当 next 指针在它们不知情的情况下更改时,它们不会意识到预取的内容是错误的。再次强调,这里有一个 list_for_each_entry_rcu()include/linux/list.h)来帮助你。当然,写入者可以直接使用 list_for_each_entry(),因为不可能同时有两个写入者。

我们最终的难题是:我们什么时候才能真正销毁被删除的元素?请记住,一个读取者现在可能正在遍历列表中的这个元素:如果我们释放这个元素并且 next 指针发生更改,读取者将跳转到垃圾数据并崩溃。我们需要等到我们知道所有在删除该元素时正在遍历列表的读取者都已完成。我们使用 call_rcu() 来注册一个回调函数,一旦所有先前存在的读取者都完成,该回调函数将实际销毁该对象。或者,可以使用 synchronize_rcu() 来阻塞,直到所有先前存在的读取者都完成。

但是,Read Copy Update 如何知道读取者何时完成?方法是这样的:首先,读取者总是在 rcu_read_lock()/rcu_read_unlock() 对中遍历列表:这些操作只是禁用抢占,因此读取者在读取列表时不会进入休眠状态。

然后,RCU 会等待直到每个其他 CPU 都至少休眠过一次:由于读取者不能休眠,我们知道在删除期间遍历列表的任何读取者都已完成,并且回调函数被触发。真正的 Read Copy Update 代码比这更优化一些,但这是基本思想。

--- cache.c.perobjectlock   2003-12-11 17:15:03.000000000 +1100
+++ cache.c.rcupdate    2003-12-11 17:55:14.000000000 +1100
@@ -1,15 +1,18 @@
 #include <linux/list.h>
 #include <linux/slab.h>
 #include <linux/string.h>
+#include <linux/rcupdate.h>
 #include <linux/mutex.h>
 #include <asm/errno.h>

 struct object
 {
-        /* These two protected by cache_lock. */
+        /* This is protected by RCU */
         struct list_head list;
         int popularity;

+        struct rcu_head rcu;
+
         atomic_t refcnt;

         /* Doesn't change once created. */
@@ -40,7 +43,7 @@
 {
         struct object *i;

-        list_for_each_entry(i, &cache, list) {
+        list_for_each_entry_rcu(i, &cache, list) {
                 if (i->id == id) {
                         i->popularity++;
                         return i;
@@ -49,19 +52,25 @@
         return NULL;
 }

+/* Final discard done once we know no readers are looking. */
+static void cache_delete_rcu(void *arg)
+{
+        object_put(arg);
+}
+
 /* Must be holding cache_lock */
 static void __cache_delete(struct object *obj)
 {
         BUG_ON(!obj);
-        list_del(&obj->list);
-        object_put(obj);
+        list_del_rcu(&obj->list);
         cache_num--;
+        call_rcu(&obj->rcu, cache_delete_rcu);
 }

 /* Must be holding cache_lock */
 static void __cache_add(struct object *obj)
 {
-        list_add(&obj->list, &cache);
+        list_add_rcu(&obj->list, &cache);
         if (++cache_num > MAX_CACHE_SIZE) {
                 struct object *i, *outcast = NULL;
                 list_for_each_entry(i, &cache, list) {
@@ -104,12 +114,11 @@
 struct object *cache_find(int id)
 {
         struct object *obj;
-        unsigned long flags;

-        spin_lock_irqsave(&cache_lock, flags);
+        rcu_read_lock();
         obj = __cache_find(id);
         if (obj)
                 object_get(obj);
-        spin_unlock_irqrestore(&cache_lock, flags);
+        rcu_read_unlock();
         return obj;
 }

请注意,读取者将在 __cache_find() 中修改 popularity 成员,并且现在它不持有锁。一种解决方案是将其设为 atomic_t,但对于此用法,我们并不真正在意竞争:近似结果就足够了,所以我没有更改它。

结果是 cache_find() 不需要与任何其他函数同步,因此在 SMP 上几乎与在 UP 上一样快。

这里还有一个进一步优化的可能性:请记住我们最初的缓存代码,其中没有引用计数,并且调用者在每次使用该对象时都持有锁?这仍然是可能的:如果你持有锁,没有人可以删除该对象,因此你不需要获取和释放引用计数。

现在,由于 RCU 中的“读取锁”只是禁用抢占,因此在调用 cache_find() 和 object_put() 之间始终禁用抢占的调用者实际上不需要获取和释放引用计数:我们可以通过使其非静态来公开 __cache_find(),并且此类调用者可以直接调用它。

这里的好处是引用计数没有被写入:该对象没有以任何方式被修改,这在 SMP 机器上由于缓存而快得多。

每个 CPU 数据

另一种广泛用于避免锁定的技术是为每个 CPU 复制信息。例如,如果你想统计一个常见条件,你可以使用自旋锁和一个计数器。简单又好用。

如果这太慢(通常不是,但如果你有一个非常大的机器进行测试,并且可以证明它太慢),你可以为每个 CPU 使用一个计数器,这样它们都不需要独占锁。请参阅 DEFINE_PER_CPU()、get_cpu_var() 和 put_cpu_var()(include/linux/percpu.h)。

对于简单的每个 CPU 计数器,特别有用的是 local_t 类型,以及 cpu_local_inc() 和相关函数,它们在某些架构上比简单代码更有效率(include/asm/local.h)。

请注意,如果没有引入更多锁,就没有简单、可靠的方法来获得此类计数器的精确值。对于某些用途,这不是问题。

主要由 IRQ 处理程序使用的数据

如果数据始终从同一个 IRQ 处理程序内部访问,则根本不需要锁:内核已经保证 irq 处理程序不会在多个 CPU 上同时运行。

Manfred Spraul 指出,即使数据偶尔在用户上下文或软中断/tasklet 中访问,你仍然可以这样做。irq 处理程序不使用锁,所有其他访问都按如下方式完成

mutex_lock(&lock);
disable_irq(irq);
...
enable_irq(irq);
mutex_unlock(&lock);

disable_irq() 阻止 irq 处理程序运行(并等待它完成,如果它当前正在其他 CPU 上运行)。自旋锁阻止任何其他访问同时发生。当然,这比仅调用 spin_lock_irq() 慢,因此只有当这种类型的访问极少发生时才有意义。

哪些函数可以安全地从中断中调用?

内核中的许多函数会休眠(即直接或间接调用 schedule()):在持有自旋锁或禁用抢占的情况下,永远不能调用它们。这也意味着你需要在用户上下文中:从中断中调用它们是非法的。

一些会休眠的函数

下面列出了最常见的函数,但你通常必须阅读代码才能找出其他调用是否安全。如果其他所有调用它的函数都可以休眠,那么你可能也需要能够休眠。特别是,注册和注销函数通常期望从用户上下文调用,并且可以休眠。

一些不会休眠的函数

一些函数可以安全地从任何上下文调用,或者持有几乎任何锁。

互斥锁 API 参考

mutex_init

mutex_init (mutex)

初始化互斥锁

参数

mutex

要初始化的互斥锁

描述

将互斥锁初始化为未锁定状态。

不允许初始化已锁定的互斥锁。

mutex_init_with_key

mutex_init_with_key (mutex, key)

使用给定的 lockdep 键初始化互斥锁

参数

mutex

要初始化的互斥锁

key

要与互斥锁关联的 lockdep 键

描述

将互斥锁初始化为未锁定状态。

不允许初始化已锁定的互斥锁。

bool mutex_is_locked(struct mutex *lock)

互斥锁是否被锁定

参数

struct mutex *lock

要查询的互斥锁

描述

如果互斥锁被锁定,则返回 true;如果未锁定,则返回 false。

void mutex_lock(struct mutex *lock)

获取互斥锁

参数

struct mutex *lock

要获取的互斥锁

描述

为此任务独占锁定互斥锁。如果互斥锁现在不可用,它将休眠,直到可以获取它为止。

互斥锁稍后必须由获取它的同一个任务释放。不允许递归锁定。任务在未首先解锁互斥锁的情况下不得退出。此外,互斥锁所在的内核内存不得在互斥锁仍然锁定的情况下释放。互斥锁必须先初始化(或静态定义)才能被锁定。不允许将互斥锁 memset() 为 0。

(CONFIG_DEBUG_MUTEXES .config 选项会启用调试检查,这将强制执行限制,并会进行死锁调试)

此函数类似于(但不等效于)down()。

void mutex_unlock(struct mutex *lock)

释放互斥锁

参数

struct mutex *lock

要释放的互斥锁

描述

解锁之前被此任务锁定的互斥锁。

此函数不得在中断上下文中使用。不允许解锁未锁定的互斥锁。

调用者必须确保互斥锁在函数返回之前保持有效 - mutex_unlock() 不能直接用于释放一个对象,以便另一个并发任务可以释放它。互斥锁在这方面与自旋锁和引用计数不同。

此函数类似于(但不等同于)up()。

void ww_mutex_unlock(struct ww_mutex *lock)

释放 w/w 互斥锁

参数

struct ww_mutex *lock

要释放的互斥锁

描述

解锁之前被此任务使用任何 ww_mutex_lock* 函数(无论是否带有获取上下文)锁定的互斥锁。在释放获取上下文后禁止释放锁。

此函数不得在中断上下文中使用。不允许解锁未锁定的互斥锁。

int ww_mutex_trylock(struct ww_mutex *ww, struct ww_acquire_ctx *ww_ctx)

尝试获取带有可选获取上下文的 w/w 互斥锁

参数

struct ww_mutex *ww

要锁定的互斥锁

struct ww_acquire_ctx *ww_ctx

可选的 w/w 获取上下文

描述

尝试使用可选的获取上下文锁定互斥锁;无法进行死锁检测。如果互斥锁已成功获取,则返回 1,否则返回 0。

与 ww_mutex_lock 不同,不执行死锁处理。但是,如果指定了 **ctx**,则在调用 ww_mutex_trylock 时可能会发生 -EALREADY 处理。

用此函数获取的互斥锁必须用 ww_mutex_unlock 释放。

int mutex_lock_interruptible(struct mutex *lock)

获取互斥锁,可被信号中断。

参数

struct mutex *lock

要获取的互斥锁。

描述

mutex_lock() 一样锁定互斥锁。如果在进程休眠时传递信号,此函数将返回,而不获取互斥锁。

上下文

进程上下文。

返回

如果锁成功获取,则返回 0,如果收到信号,则返回 -EINTR

int mutex_lock_killable(struct mutex *lock)

获取互斥锁,可被致命信号中断。

参数

struct mutex *lock

要获取的互斥锁。

描述

mutex_lock() 一样锁定互斥锁。如果在进程休眠时传递将对当前进程致命的信号,此函数将返回,而不获取互斥锁。

上下文

进程上下文。

返回

如果锁成功获取,则返回 0,如果收到致命信号,则返回 -EINTR

void mutex_lock_io(struct mutex *lock)

获取互斥锁并将进程标记为等待 I/O

参数

struct mutex *lock

要获取的互斥锁。

描述

mutex_lock() 一样锁定互斥锁。当任务等待此互斥锁时,调度程序会将其计为处于 IO 等待状态。

上下文

进程上下文。

int mutex_trylock(struct mutex *lock)

尝试获取互斥锁,无需等待

参数

struct mutex *lock

要获取的互斥锁

描述

尝试原子地获取互斥锁。如果互斥锁已成功获取,则返回 1,如果存在争用,则返回 0。

此函数不得在中断上下文中使用。互斥锁必须由获取它的同一任务释放。

注意

此函数遵循 spin_trylock() 约定,因此它与 down_trylock() 返回值相反!在将信号量用户转换为互斥锁时要小心这一点。

int atomic_dec_and_mutex_lock(atomic_t *cnt, struct mutex *lock)

如果减为 0,则返回持有互斥锁

参数

atomic_t *cnt

我们要递减的原子变量

struct mutex *lock

如果递减为 0,则返回持有互斥锁

描述

如果递减为 0,则返回 true 并持有锁,否则返回 false

Futex API 参考

struct futex_hash_bucket *futex_hash(union futex_key *key)

返回全局哈希中的哈希桶

参数

union futex_key *key

指向要计算哈希值的 futex 键的指针

描述

我们在从 get_futex_key 返回的键(见下文)上进行哈希,并返回全局哈希中对应的哈希桶。

struct hrtimer_sleeper *futex_setup_timer(ktime_t *time, struct hrtimer_sleeper *timeout, int flags, u64 range_ns)

设置休眠的 hrtimer。

参数

ktime_t *time

指向给定超时值的指针

struct hrtimer_sleeper *timeout

要设置的 hrtimer_sleeper 结构

int flags

futex 标志

u64 range_ns

可选的纳秒范围

返回

如果未给出超时值,则返回初始化的 hrtimer_sleeper 结构或 NULL

给定的值

int get_futex_key(u32 __user *uaddr, unsigned int flags, union futex_key *key, enum futex_access rw)

获取作为 futex 键的参数

参数

u32 __user *uaddr

futex 的虚拟地址

unsigned int flags

FLAGS_*

union futex_key *key

存储结果的地址。

enum futex_access rw

映射需要是读/写权限 (值:FUTEX_READ, FUTEX_WRITE)

返回

一个负的错误代码或 0

描述

关键字成功时存储在 key 中。

对于共享映射 (当 fshared 为真时),关键字是

( inode->i_sequence, page->index, offset_within_page )

[ 另请参阅 get_inode_sequence_number() ]

对于私有映射 (或当 !fshared 为真时),关键字是

( current->mm, address, 0 )

这允许(在适用的情况下,跨进程)识别 futex,而无需在 FUTEX_WAIT 期间保持页面固定。

lock_page() 可能会睡眠,调用者不应持有自旋锁。

int fault_in_user_writeable(u32 __user *uaddr)

陷入用户地址并验证读写访问权限

参数

u32 __user *uaddr

指向发生错误的用户空间地址的指针

描述

修复我们刚刚在对 uaddr 进行原子写访问时发生的错误的慢速路径。

我们没有对用户地址进行非破坏性写入的通用实现。我们知道我们陷入了原子页面错误禁用部分,因此我们也可以立即调用 get_user_pages() 来避免 #PF 开销。

struct futex_q *futex_top_waiter(struct futex_hash_bucket *hb, union futex_key *key)

返回 futex 上优先级最高的等待者

参数

struct futex_hash_bucket *hb

futex_q 所在的哈希桶

union futex_key *key

futex 键(将其与其他 futex futex_q 区分开)

描述

必须在持有 hb 锁的情况下调用。

void wait_for_owner_exiting(int ret, struct task_struct *exiting)

阻塞直到所有者退出

参数

int ret

所有者当前的 futex 锁状态

struct task_struct *exiting

指向正在退出的任务的指针

描述

调用者必须持有 exiting 的引用计数。

void __futex_unqueue(struct futex_q *q)

从其 futex_hash_bucket 中删除 futex_q

参数

struct futex_q *q

要取消排队的 futex_q

描述

q->lock_ptr 不能为 NULL,并且必须由调用者持有。

int futex_unqueue(struct futex_q *q)

从其 futex_hash_bucket 中删除 futex_q

参数

struct futex_q *q

要取消排队的 futex_q

描述

q->lock_ptr 不能由调用者持有。对 futex_unqueue() 的调用必须与之前对 futex_queue() 的一次调用配对。

返回

  • 1 - 如果 futex_q 仍在队列中(并且我们已将其取消排队);

  • 0 - 如果 futex_q 已被唤醒线程删除

void futex_exit_recursive(struct task_struct *tsk)

将任务的 futex 状态设置为 FUTEX_STATE_DEAD

参数

struct task_struct *tsk

要设置状态的任务

描述

以无锁方式设置任务的 futex 退出状态。当任务正在退出时,futex 等待者代码会观察到该状态并循环,直到任务实际完成 futex 清理。最坏的情况是,等待者会通过等待循环运行,直到状态变得可见。

这是从 make_task_dead() 中的递归错误处理路径调用的。

这是尽力而为。futex 退出代码是否已运行。如果在 futex 上设置了 OWNER_DIED 位,则等待者可以接管它。如果不是,则问题将推回给用户空间。如果 futex 退出代码尚未运行,则已排队的等待者可能会永远阻塞,但对此无能为力。

struct futex_q

哈希的 futex 队列条目,每个等待任务一个

定义:

struct futex_q {
    struct plist_node list;
    struct task_struct *task;
    spinlock_t *lock_ptr;
    futex_wake_fn *wake;
    void *wake_data;
    union futex_key key;
    struct futex_pi_state *pi_state;
    struct rt_mutex_waiter *rt_waiter;
    union futex_key *requeue_pi_key;
    u32 bitset;
    atomic_t requeue_state;
#ifdef CONFIG_PREEMPT_RT;
    struct rcuwait requeue_wait;
#endif;
};

成员

list

在此 futex 上等待的任务的按优先级排序的列表

task

在 futex 上等待的任务

lock_ptr

哈希桶锁

wake

此队列的唤醒处理程序

wake_data

与唤醒处理程序关联的数据

key

futex 哈希的键

pi_state

可选的优先级继承状态

rt_waiter

用于 requeue_pi 的 rt_waiter 存储

requeue_pi_key

requeue_pi 目标 futex 键

bitset

用于可选的位掩码唤醒的位集

requeue_state

futex_requeue_pi() 的状态字段

requeue_wait

futex_requeue_pi() 的 RCU 等待(仅限 RT)

描述

我们使用此哈希等待队列,而不是正常的 wait_queue_entry_t,因此我们可以仅唤醒相关的队列(哈希队列可能是共享的)。

futex_q 具有唤醒状态,就像任务具有 TASK_RUNNING 一样。当 plist_node_empty(q->list) || q->lock_ptr == 0 时,它被视为唤醒。唤醒的顺序始终是先使第一个条件为真,然后再使第二个条件为真。

PI futex 通常在通过 rt_mutex 代码从哈希列表中删除之前被唤醒。请参阅 futex_unqueue_pi()。

int futex_match(union futex_key *key1, union futex_key *key2)

检查两个 futex 键是否相等

参数

union futex_key *key1

指向 key1 的指针

union futex_key *key2

指向 key2 的指针

描述

如果两个 futex_key 相等,则返回 1,否则返回 0。

void futex_queue(struct futex_q *q, struct futex_hash_bucket *hb)

将 futex_q 排队到 futex_hash_bucket 上

参数

struct futex_q *q

要排队的 futex_q

struct futex_hash_bucket *hb

目标哈希桶

描述

hb->lock 必须由调用者持有,并且在此处释放。对 futex_queue() 的调用通常与对 futex_unqueue() 的一次调用配对。例外情况涉及 PI 相关操作,这些操作可能会使用 futex_unqueue_pi() 或者如果取消排队是唤醒过程的一部分并且取消排队状态在唤醒任务的状态中是隐式的,则不执行任何操作(有关示例,请参见 futex_wait_requeue_pi())。

struct futex_vector

futex_waitv() 的辅助结构

定义:

struct futex_vector {
    struct futex_waitv w;
    struct futex_q q;
};

成员

w

用户空间提供的数据

q

内核端数据

描述

用于构建包含 futex_waitv() 所需所有数据的数组的结构体

int futex_lock_pi_atomic(u32 __user *uaddr, struct futex_hash_bucket *hb, union futex_key *key, struct futex_pi_state **ps, struct task_struct *task, struct task_struct **exiting, int set_waiters)

获取支持 PI 的 futex 所需的原子操作

参数

u32 __user *uaddr

pi futex 用户地址

struct futex_hash_bucket *hb

pi futex 哈希桶

union futex_key *key

与 uaddr 和 hb 关联的 futex 键

struct futex_pi_state **ps

pi_state 指针,用于存储查找结果

struct task_struct *task

执行原子锁操作的任务。除了在 requeue pi 的情况下,这将是 “current”。

struct task_struct **exiting

指针,用于存储正在退出的所有者任务的任务指针

int set_waiters

强制设置 FUTEX_WAITERS 位 (1) 或不设置 (0)

返回

  • 0 - 准备等待;

  • 1 - 获取了锁;

  • <0 - 错误

描述

调用者必须持有 hb->lock。

只有当返回值为 -EBUSY 时才会设置 exiting。如果是这样,则这会在返回时保持对退出任务的引用计数,并且调用者需要在等待退出完成后将其删除。

int fixup_pi_owner(u32 __user *uaddr, struct futex_q *q, int locked)

锁 pi_state 后的处理和特殊情况管理

参数

u32 __user *uaddr

futex 的用户地址

struct futex_q *q

futex_q(包含 pi_state 和对 rt_mutex 的访问)

int locked

尝试获取 rt_mutex 是否成功 (1) 或失败 (0)

描述

在尝试锁定 rt_mutex 后,会调用此函数来清理 pi_state 所有者,并处理可能允许我们获取锁的竞争条件。必须在持有 hb 锁的情况下调用。

返回

  • 1 - 成功,已获取锁;

  • 0 - 成功,未获取锁;

  • <0 - 发生错误 (-EFAULT)

void requeue_futex(struct futex_q *q, struct futex_hash_bucket *hb1, struct futex_hash_bucket *hb2, union futex_key *key2)

将 futex_q 从一个 hb 重新排队到另一个

参数

struct futex_q *q

要重新排队的 futex_q

struct futex_hash_bucket *hb1

源哈希桶

struct futex_hash_bucket *hb2

目标哈希桶

union futex_key *key2

重新排队的 futex_q 的新键

void requeue_pi_wake_futex(struct futex_q *q, union futex_key *key, struct futex_hash_bucket *hb)

唤醒在重新排队期间获取锁的任务

参数

struct futex_q *q

futex_q

union futex_key *key

重新排队目标 futex 的键

struct futex_hash_bucket *hb

重新排队目标 futex 的哈希桶

描述

在 futex_requeue 期间,如果 requeue_pi=1,则如果目标 futex 没有竞争或通过锁窃取,则可以获取该目标 futex。

  1. q::key 设置为重新排队目标 futex 键,以便等待者可以在正确的 futex 上检测到唤醒。

  2. 从哈希桶中取消 q 的排队。

  3. q::rt_waiter 设置为 NULL,以便唤醒的任务可以检测到原子锁获取。

  4. 将 q->lock_ptr 设置为重新排队目标 hb->lock,以防等待者必须修复 pi 状态。

  5. 完成重新排队状态,以便等待者可以取得进展。在此之后,如果不需要修复 pi 状态,等待者任务可以立即从系统调用返回。

  6. 唤醒等待者任务。

必须在持有 q->lock_ptr 和 hb->lock 的情况下调用。

int futex_proxy_trylock_atomic(u32 __user *pifutex, struct futex_hash_bucket *hb1, struct futex_hash_bucket *hb2, union futex_key *key1, union futex_key *key2, struct futex_pi_state **ps, struct task_struct **exiting, int set_waiters)

尝试为顶部等待者执行原子锁

参数

u32 __user *pifutex

要执行的 futex 的用户地址

struct futex_hash_bucket *hb1

来源 futex 哈希桶,必须由调用者锁定

struct futex_hash_bucket *hb2

目标 futex 哈希桶,必须由调用者锁定

union futex_key *key1

来源 futex 键

union futex_key *key2

目标 futex 键

struct futex_pi_state **ps

用于存储 pi_state 指针的地址

struct task_struct **exiting

指针,用于存储正在退出的所有者任务的任务指针

int set_waiters

强制设置 FUTEX_WAITERS 位 (1) 或不设置 (0)

描述

如果我们可以原子方式执行操作,则尝试代表顶部等待者获取锁。如果成功,则唤醒顶部等待者。如果调用者指定了 set_waiters,则指示 futex_lock_pi_atomic() 强制设置 FUTEX_WAITERS 位。hb1 和 hb2 必须由调用者持有。

只有当返回值为 -EBUSY 时才会设置 exiting。如果是这样,则这会在返回时保持对退出任务的引用计数,并且调用者需要在等待退出完成后将其删除。

返回

  • 0 - 未能以原子方式获取锁;

  • >0 - 获取了锁,返回值是顶部等待者的 vpid

  • <0 - 错误

int futex_requeue(u32 __user *uaddr1, unsigned int flags1, u32 __user *uaddr2, unsigned int flags2, int nr_wake, int nr_requeue, u32 *cmpval, int requeue_pi)

将等待者从 uaddr1 重新排队到 uaddr2

参数

u32 __user *uaddr1

源 futex 用户地址

unsigned int flags1

futex 标志 (FLAGS_SHARED 等)

u32 __user *uaddr2

目标 futex 用户地址

unsigned int flags2

futex 标志 (FLAGS_SHARED 等)

int nr_wake

要唤醒的等待者数量(对于 requeue_pi 必须为 1)

int nr_requeue

要重新排队的等待者数量 (0-INT_MAX)

u32 *cmpval

uaddr1 的预期值 (或 NULL)

int requeue_pi

如果我们尝试从非 pi futex 重新排队到 pi futex (不支持 pi 到 pi 的重新排队)

描述

将 uaddr1 上的等待者重新排队到 uaddr2。在 requeue_pi 的情况下,尝试代表最顶层的等待者原子地获取 uaddr2。

返回

  • >=0 - 成功时,重新排队或唤醒的任务数;

  • <0 - 出错时

int handle_early_requeue_pi_wakeup(struct futex_hash_bucket *hb, struct futex_q *q, struct hrtimer_sleeper *timeout)

处理初始 futex 上的提前唤醒

参数

struct futex_hash_bucket *hb

futex_q 最初入队的哈希桶

struct futex_q *q

在等待重新排队时唤醒的 futex_q

struct hrtimer_sleeper *timeout

与等待关联的超时时间(如果没有则为 NULL)

描述

确定提前唤醒的原因。

返回

-EWOULDBLOCK 或 -ETIMEDOUT 或 -ERESTARTNOINTR

int futex_wait_requeue_pi(u32 __user *uaddr, unsigned int flags, u32 val, ktime_t *abs_time, u32 bitset, u32 __user *uaddr2)

在 uaddr 上等待并获取 uaddr2

参数

u32 __user *uaddr

我们最初等待的 futex(非 pi)

unsigned int flags

futex 标志 (FLAGS_SHARED, FLAGS_CLOCKRT 等),它们必须是相同的类型,不能从私有重新排队到共享等。

u32 val

uaddr 的预期值

ktime_t *abs_time

绝对超时

u32 bitset

用户空间设置的 32 位唤醒位集,默认为全部

u32 __user *uaddr2

我们将在返回用户空间之前获取的 pi futex

描述

调用者将在 uaddr 上等待,并将通过 futex_requeue() 重新排队到 uaddr2,uaddr2 必须是 PI 感知的,并且与 uaddr 不同。正常唤醒将在 uaddr2 上唤醒,并在返回用户空间之前完成 rt_mutex 的获取。这确保了 rt_mutex 在有等待者时保持所有者;如果没有所有者,如果需要,pi 逻辑将不知道要提升/降级哪个任务。

当我们在 futex_wait_queue() 中入队时,我们调用 schedule,并通过以下方式返回那里:1) 在 futex_requeue() 原子锁获取后,在 uaddr2 上唤醒 2) 重新排队后在 uaddr2 上唤醒 3) 信号 4) 超时

如果为 3,则清理并返回 -ERESTARTNOINTR。

如果为 2,我们可能会阻止尝试获取 rt_mutex,并通过以下方式返回:5) 成功锁定 6) 信号 7) 超时 8) 其他锁获取失败

如果为 6,则返回 -EWOULDBLOCK(重新启动系统调用将执行相同的操作)。

如果为 4 或 7,我们将清理并返回 -ETIMEDOUT。

返回

  • 0 - 成功时;

  • <0 - 出错时

void futex_wait_queue(struct futex_hash_bucket *hb, struct futex_q *q, struct hrtimer_sleeper *timeout)

futex_queue() 并等待唤醒、超时或信号

参数

struct futex_hash_bucket *hb

futex 哈希桶,必须由调用者锁定

struct futex_q *q

要入队的 futex_q

struct hrtimer_sleeper *timeout

准备好的 hrtimer_sleeper,如果没有超时则为 null

int futex_unqueue_multiple(struct futex_vector *v, int count)

从其哈希桶中删除各种 futex

参数

struct futex_vector *v

要取消入队的 futex 列表

int count

列表中 futex 的数量

描述

帮助取消入队 futex 列表。这不会失败。

返回

  • >=0 - 最后唤醒的 futex 的索引;

  • -1
    • 没有 futex 被唤醒

int futex_wait_multiple_setup(struct futex_vector *vs, int count, int *woken)

准备等待并入队多个 futex

参数

struct futex_vector *vs

要等待的 futex 列表

int count

列表的大小

int *woken

最后唤醒的 futex 的索引(如果有)。用于通知调用者它可以将此索引返回到用户空间(返回参数)

描述

在单个步骤中准备多个 futex 并将其入队。如果 futex 列表无效或任何 futex 已被唤醒,则可能会失败。成功后,该任务已准备好进行可中断的睡眠。

返回

  • 1 - 其中一个 futex 被另一个线程唤醒

  • 0 - 成功

  • <0 - -EFAULT, -EWOULDBLOCK 或 -EINVAL

void futex_sleep_multiple(struct futex_vector *vs, unsigned int count, struct hrtimer_sleeper *to)

检查休眠条件并休眠

参数

struct futex_vector *vs

等待的 futex 列表

unsigned int count

vs 的长度

struct hrtimer_sleeper *to

超时时间

描述

当且仅当超时时间未到期且列表中没有 futex 被唤醒时休眠。

int futex_wait_multiple(struct futex_vector *vs, unsigned int count, struct hrtimer_sleeper *to)

准备等待并加入多个 futex

参数

struct futex_vector *vs

要等待的 futex 列表

unsigned int count

对象的数量

struct hrtimer_sleeper *to

放弃并返回用户空间前的超时时间

描述

FUTEX_WAIT_MULTIPLE futex 操作的入口点,此函数在一组 futex 上休眠,并在第一个被唤醒的 futex 或超时时间过后返回。

返回

  • >=0 - 指示被唤醒的 futex

  • <0 - 出错时

int futex_wait_setup(u32 __user *uaddr, u32 val, unsigned int flags, struct futex_q *q, struct futex_hash_bucket **hb)

准备等待一个 futex

参数

u32 __user *uaddr

futex 用户空间地址

u32 val

期望值

unsigned int flags

futex 标志 (FLAGS_SHARED 等)

struct futex_q *q

关联的 futex_q

struct futex_hash_bucket **hb

存储 hash_bucket 指针,以便返回给调用者

描述

设置 futex_q 并定位 hash_bucket。获取 futex 值并将其与期望值进行比较。内部处理原子错误。成功时返回 hb 锁,失败时解锁。

返回

  • 0 - uaddr 包含 val 且 hb 已锁定;

  • <1 - -EFAULT 或 -EWOULDBLOCK (uaddr 不包含 val) 且 hb 已解锁

进一步阅读

  • Documentation/locking/spinlocks.rst:Linux Torvalds 在内核源代码中的自旋锁教程。

  • 现代体系结构的 Unix 系统:内核程序员的对称多处理和缓存

    Curt Schimmel 对内核级锁定的非常好的介绍(不是为 Linux 编写的,但几乎所有内容都适用)。这本书很贵,但对于理解 SMP 锁定来说,真的物超所值。[ISBN:0201633388]

感谢

感谢 Telsa Gwynne 提供 DocBooking、整理和添加样式。

感谢 Martin Pool、Philipp Rumpf、Stephen Rothwell、Paul Mackerras、Ruedi Aschwanden、Alan Cox、Manfred Spraul、Tim Waugh、Pete Zaitcev、James Morris、Robert Love、Paul McKenney、John Ashby 进行校对、更正、抨击、评论。

感谢阴谋集团对本文档没有影响。

术语表

抢占

在 2.5 之前,或者当 CONFIG_PREEMPT 未设置时,内核中用户上下文中的进程不会互相抢占(即,您拥有该 CPU 直到您放弃它,除了中断)。在 2.5.4 中添加 CONFIG_PREEMPT 后,这种情况发生了改变:当在用户上下文中时,优先级更高的任务可以“切入”:自旋锁被更改为禁用抢占,即使在 UP 上也是如此。

bh

下半部:由于历史原因,带有“_bh”的函数现在通常指任何软件中断,例如 spin_lock_bh() 会阻止当前 CPU 上的任何软件中断。下半部已弃用,最终将被 tasklet 取代。任何时候只有一个下半部会运行。

硬件中断 / 硬件 IRQ

硬件中断请求。in_hardirq() 在硬件中断处理程序中返回 true。

中断上下文

非用户上下文:处理硬件 irq 或软件 irq。由返回 true 的 in_interrupt() 宏指示。

SMP

对称多处理器:为多 CPU 机器编译的内核。(CONFIG_SMP=y)。

软件中断 / softirq

软件中断处理程序。in_hardirq() 返回 false;in_softirq() 返回 true。tasklet 和 softirq 都属于“软件中断”的范畴。

严格来说,softirq 是多达 32 个枚举的软件中断之一,可以在多个 CPU 上同时运行。有时也用于指代 tasklet(即,所有软件中断)。

tasklet

一个动态可注册的软件中断,保证一次只在一个 CPU 上运行。

定时器

一个动态可注册的软件中断,在给定时间(或接近给定时间)运行时运行。运行时,它就像一个 tasklet(实际上,它们是从 TIMER_SOFTIRQ 调用的)。

UP

单处理器:非 SMP。(CONFIG_SMP=n)。

用户上下文

代表特定进程(即系统调用或陷阱)或内核线程执行的内核。您可以使用 current 宏来判断哪个进程。不要与用户空间混淆。可以被软件或硬件中断中断。

用户空间

一个在其自己的内核外部代码中执行的进程。