RT互斥锁实现设计

版权所有 (c) 2006 Steven Rostedt

根据GNU自由文档许可证1.2版授权

本文档试图描述rtmutex.c的实现设计。它没有描述rtmutex.c存在的原因。为此,请参阅支持PI的RT互斥锁子系统。尽管本文档确实解释了没有此代码时发生的问题,但这旨在理解代码的实际作用。

本文档的目标是帮助其他人理解所使用的优先级继承(PI)算法,以及做出以这种方式实现PI的决定的原因。

无界优先级反转

优先级反转是指当高优先级进程想要运行时,低优先级进程正在执行。发生这种情况有几个原因,并且大多数时候是不可避免的。任何时候,高优先级进程想要使用低优先级进程拥有的资源(例如互斥锁),高优先级进程都必须等待,直到低优先级进程完成对资源的使用。这就是优先级反转。我们想要阻止的是一种称为无界优先级反转的情况。即高优先级进程因低优先级进程而无法运行,并且等待时间不确定。

无界优先级反转的经典例子是,假设有三个进程,我们称之为进程A、B和C,其中A是优先级最高的进程,C是最低的,B介于两者之间。A试图获取C拥有的锁,必须等待并让C运行以释放锁。但与此同时,B开始执行,由于B的优先级高于C,它会抢占C,但这样做实际上是在抢占优先级更高的A进程。现在无法知道A会休眠等待C释放锁多长时间,因为就我们所知,B可能是一个CPU消耗大户,永远不会让C有机会释放锁。这称为无界优先级反转。

下面是一些ASCII艺术,显示了这个问题

   grab lock L1 (owned by C)
     |
A ---+
        C preempted by B
          |
C    +----+

B         +-------->
                B now keeps A from running.

优先级继承(PI)

有几种方法可以解决这个问题,但其他方法不在本文档的讨论范围之内。这里我们只讨论PI。

PI是指如果一个进程阻塞在当前进程拥有的锁上,该进程会继承另一个进程的优先级。为了更容易理解这一点,让我们再次使用之前的示例,包含进程A、B和C。

这一次,当A阻塞在C拥有的锁上时,C将继承A的优先级。因此,如果B变为可运行状态,它不会抢占C,因为C现在具有A的高优先级。一旦C释放锁,它将失去其继承的优先级,然后A可以使用C拥有的资源继续执行。

术语

这里我解释本文档中使用的一些术语,以帮助描述用于实现PI的设计。

PI链
  • PI链是由一系列有序的锁和进程组成的,这些锁和进程导致进程从先前在其锁之一上阻塞的进程继承优先级。这将在本文档后面更详细地描述。

互斥锁
  • 在本文档中,为了区分实现PI的锁和PI代码中使用的自旋锁,从现在开始,PI锁将称为互斥锁。

  • 从现在开始,在本文档中,我将使用术语“锁”来指代用于保护PI算法各部分的自旋锁。这些锁会禁用UP的抢占(启用CONFIG_PREEMPT时),并在SMP上防止多个CPU同时进入临界区。

自旋锁
  • 与上面的锁相同。

等待者
  • 等待者是存储在阻塞进程堆栈上的结构。由于等待者的作用域在进程阻塞在互斥锁上的代码中,因此将等待者分配在进程的堆栈上(局部变量)是可行的。此结构保存指向任务的指针,以及任务阻塞的互斥锁。它还具有rbtree节点结构,用于将任务放置在互斥锁的等待者rbtree中,以及互斥锁所有者任务的pi_waiters rbtree中(如下所述)。

    有时,等待者指的是正在等待互斥锁的任务。这与等待者->任务相同。

等待者
  • 阻塞在互斥锁上的进程列表。

顶层等待者
  • 等待特定互斥锁的优先级最高的进程。

顶层pi等待者
  • 等待特定进程拥有的互斥锁之一的优先级最高的进程。

注意

任务和进程在本文档中可以互换使用,主要用于区分一起描述的两个进程。

PI链

PI链是可能导致发生优先级继承的进程和互斥锁列表。多个链可能会收敛,但链永远不会发散,因为一个进程不能同时阻塞在多个互斥锁上。

示例

Process:  A, B, C, D, E
Mutexes:  L1, L2, L3, L4

A owns: L1
        B blocked on L1
        B owns L2
               C blocked on L2
               C owns L3
                      D blocked on L3
                      D owns L4
                             E blocked on L4

链将是

E->L4->D->L3->C->L2->B->L1->A

为了显示两条链合并的位置,我们可以添加另一个进程F和另一个互斥锁L5,其中B拥有L5,F阻塞在互斥锁L5上。

F的链将是

F->L5->B->L1->A

由于一个进程可能拥有多个互斥锁,但永远不会阻塞在多个互斥锁上,因此链会合并。

这里我们显示两条链

E->L4->D->L3->C->L2-+
                    |
                    +->B->L1->A
                    |
              F->L5-+

为了使PI工作,这些链右端(或者我们也可以称为链顶端)的进程的优先级必须等于或高于链的左侧或下方的进程。

此外,由于一个互斥锁可能有多个进程阻塞在其上,因此我们可以在互斥锁处合并多个链。如果我们添加另一个阻塞在互斥锁L2上的进程G

G->L2->B->L1->A

再一次,为了展示这如何增长,我将再次显示合并的链

E->L4->D->L3->C-+
                +->L2-+
                |     |
              G-+     +->B->L1->A
                      |
                F->L5-+

如果进程G在链中具有最高优先级,则链上的所有任务(本例中的A和B)的优先级必须提高到与G相同。

互斥锁等待者树

每个互斥锁都会跟踪所有阻塞在其上的等待者。互斥锁具有一个rbtree,用于按优先级存储这些等待者。此树受互斥锁结构中的自旋锁保护。此锁称为wait_lock。

任务PI树

为了跟踪PI链,每个进程都有自己的PI rbtree。这是该进程拥有的互斥锁的所有顶层等待者的树。请注意,此树仅保存顶层等待者,而不保存阻塞在进程拥有的互斥锁上的所有等待者。

任务PI树的顶部始终是等待该任务拥有的互斥锁的优先级最高的任务。因此,如果任务继承了优先级,它将始终是该树顶部的任务的优先级。

此树作为名为pi_waiters的rbtree存储在进程的任务结构中。它受任务结构中的自旋锁(也称为pi_lock)保护。此锁也可以在中断上下文中获取,因此在锁定pi_lock时,必须禁用中断。

PI链的深度

PI链的最大深度不是动态的,实际上可以定义。但是,由于它取决于互斥锁的所有嵌套,因此很难计算出来。让我们看看有3个互斥锁L1、L2和L3以及四个单独的函数func1、func2、func3和func4的示例。以下显示了L1->L2->L3的锁定顺序,但实际上可能不是直接以那种方式嵌套的

void func1(void)
{
      mutex_lock(L1);

      /* do anything */

      mutex_unlock(L1);
}

void func2(void)
{
      mutex_lock(L1);
      mutex_lock(L2);

      /* do something */

      mutex_unlock(L2);
      mutex_unlock(L1);
}

void func3(void)
{
      mutex_lock(L2);
      mutex_lock(L3);

      /* do something else */

      mutex_unlock(L3);
      mutex_unlock(L2);
}

void func4(void)
{
      mutex_lock(L3);

      /* do something again */

      mutex_unlock(L3);
}

现在我们添加4个单独运行每个函数的进程。分别运行函数func1、func2、func3和func4的进程A、B、C和D,并且D首先运行,A最后运行。在“再次执行某些操作”区域中,D在func4中被抢占,我们有以下锁定

D owns L3
       C blocked on L3
       C owns L2
              B blocked on L2
              B owns L1
                     A blocked on L1

And thus we have the chain A->L1->B->L2->C->L3->D.

这给了我们4个(四个进程)的PI深度,但是单独查看任何一个函数,似乎它们的锁定深度最多只有2。因此,尽管锁定深度是在编译时定义的,但仍然很难找到该深度的可能性。

现在,由于互斥锁可以由用户态应用程序定义,我们不希望出现一种嵌套大量互斥锁以创建大型PI链,并在查看大量数据时使代码持有自旋锁的DOS类型的应用程序。因此,为了防止这种情况,该实现不仅实现了最大锁定深度,而且在遍历PI链时一次最多只持有两个不同的锁。下面将详细介绍。

互斥锁所有者和标志

互斥锁结构包含一个指向互斥锁所有者的指针。如果互斥锁未被拥有,则此所有者设置为 NULL。由于所有架构的任务结构都至少按两个字节对齐(如果不是这样,rtmutex.c 代码将会崩溃!),这允许将最低有效位用作标志。位 0 用作“有等待者”标志。每当互斥锁上有等待者时,就会设置此标志。

有关详细信息,请参阅 支持 PI 的 RT 互斥锁子系统

cmpxchg 技巧

一些架构实现了原子 cmpxchg(比较并交换)。这在适用时用于保持获取和释放互斥锁的快速路径较短。

cmpxchg 基本上是原子执行的以下函数

unsigned long _cmpxchg(unsigned long *A, unsigned long *B, unsigned long *C)
{
      unsigned long T = *A;
      if (*A == *B) {
              *A = *C;
      }
      return T;
}
#define cmpxchg(a,b,c) _cmpxchg(&a,&b,&c)

这非常有用,因为它允许您仅在变量是您期望的值时才更新变量。如果返回值(A 的旧值)等于 B,则您知道它是否成功。

宏 rt_mutex_cmpxchg 用于尝试锁定和解锁互斥锁。如果架构不支持 CMPXCHG,则此宏每次都会简单地设置为失败。但是,如果支持 CMPXCHG,则这将极大地帮助保持快速路径较短。

在所有者字段中使用带有标志的 rt_mutex_cmpxchg 有助于针对支持它的架构优化系统。这将在本文档的后面部分进行解释。

优先级调整

rtmutex.c 中 PI 代码的实现在几个地方需要进程调整其优先级。借助进程的 pi_waiters,很容易知道需要调整什么。

实现任务调整的函数是 rt_mutex_adjust_prio 和 rt_mutex_setprio。rt_mutex_setprio 仅在 rt_mutex_adjust_prio 中使用。

rt_mutex_adjust_prio 检查任务的优先级,以及等待该任务拥有的任何互斥锁的最高优先级进程。由于任务的 pi_waiters 按该任务拥有的所有互斥锁的所有顶部等待者的优先级顺序保存,因此我们只需要将顶部 pi 等待者与其自身的正常/截止优先级进行比较,并取较高的优先级。然后调用 rt_mutex_setprio 将任务的优先级调整为新的优先级。请注意,rt_mutex_setprio 在 kernel/sched/core.c 中定义,以实现优先级的实际更改。

注意

对于 task_struct 中的“prio”字段,数字越小,优先级越高。“prio”为 5 的优先级高于“prio”为 10 的优先级。

有趣的是,rt_mutex_adjust_prio 可以增加或降低任务的优先级。如果较高优先级的进程刚刚阻塞在任务拥有的互斥锁上,rt_mutex_adjust_prio 将会增加/提升任务的优先级。但是,如果较高优先级的任务由于某种原因离开互斥锁(超时或信号),则同一函数将降低/解除提升任务的优先级。这是因为 pi_waiters 始终包含等待任务拥有的互斥锁的最高优先级任务,因此我们只需要将顶部 pi 等待者的优先级与给定任务的正常优先级进行比较。

PI 链遍历的高级概述

PI 链遍历由函数 rt_mutex_adjust_prio_chain 实现。

该实现经历了多次迭代,最终达到了我们认为的最佳状态。它通过一次最多只获取两个锁来遍历 PI 链,并且效率非常高。

rt_mutex_adjust_prio_chain 可用于提升或降低进程优先级。

rt_mutex_adjust_prio_chain 调用时,需要传入一个要检查 PI(解除)提升的任务(进程阻塞的互斥锁的所有者)、一个检查死锁的标志、该任务拥有的互斥锁、一个指向等待者的指针,该等待者是阻塞在互斥锁上的进程的等待者结构(尽管对于解除提升,此参数可能为 NULL)、一个指向任务阻塞的互斥锁的指针,以及作为互斥锁的顶部等待者的 top_task。

为了便于解释,我将不提及死锁检测。此解释将尝试保持在高级别。

调用此函数时,不持有任何锁。这也意味着,当进入此函数时,所有者和锁的状态可能会发生变化。

在此函数调用之前,已对任务执行 rt_mutex_adjust_prio。这意味着任务已设置为其应处的优先级,但任务的等待者的 rbtree 节点尚未更新为新的优先级,并且此任务可能不在任务阻塞的 pi_waiters 和等待者树中的正确位置。此函数解决所有这些问题。

此函数的主要操作由 Thomas Gleixner 在 rtmutex.c 中总结。有关更多详细信息,请参阅“链遍历基础和保护范围”注释。

获取互斥锁(遍历)

好的,现在让我们详细了解一下获取互斥锁时发生的情况。

首先尝试的是快速获取互斥锁。这是在我们启用 CMPXCHG 时完成的(否则快速获取会自动失败)。仅当互斥锁的所有者字段为 NULL 时,才能使用 CMPXCHG 获取锁,而无需执行其他操作。

如果锁发生争用,我们将进入慢速路径(rt_mutex_slowlock)。

在慢速路径函数中,在堆栈上创建任务的等待者结构。这是因为等待者结构仅在函数的作用域内需要。等待者结构保存将任务存储在互斥锁的等待者树上的节点,如果需要,还保存所有者的 pi_waiters 树上的节点。

由于解锁互斥锁的慢速路径也会获取此锁,因此会获取互斥锁的 wait_lock。

然后我们调用 try_to_take_rt_mutex。这是不实现 CMPXCHG 的架构始终会获取锁(如果没有争用)的地方。

每次任务尝试在慢速路径中获取互斥锁时,都会使用 try_to_take_rt_mutex。这里首先执行的是原子设置互斥锁的所有者字段的“有等待者”标志。通过现在设置此标志,正在争用的互斥锁的当前所有者无法在不进入慢速解锁路径的情况下释放互斥锁,并且它需要获取此代码当前持有的 wait_lock。因此,设置“有等待者”标志会强制当前所有者与此代码同步。

如果以下条件为真,则获取锁

  1. 锁没有所有者

  2. 当前任务是所有其他锁的等待者中优先级最高的

如果任务成功获取锁,则将该任务设置为锁的所有者,并且如果锁仍然有等待者,则将 top_waiter(等待锁的最高优先级任务)添加到此任务的 pi_waiters 树中。

如果 try_to_take_rt_mutex() 未获取锁,则调用 task_blocks_on_rt_mutex() 函数。这会将任务添加到锁的等待者树中,并传播锁的 pi 链以及锁的所有者的 pi_waiters 树。这将在下一节中进行描述。

任务阻塞在互斥锁上

互斥锁和进程的记账是通过进程的等待者结构完成的。“task”字段设置为进程,“lock”字段设置为互斥锁。等待者的 rbtree 节点初始化为进程的当前优先级。

由于 wait_lock 是在慢速锁入口处获取的,因此我们可以安全地将等待者添加到任务等待者树中。如果当前进程是当前等待此互斥锁的最高优先级进程,则我们从所有者的 pi_waiters 中删除先前的顶部等待者进程(如果存在),并将当前进程添加到该树中。由于所有者的 pi_waiter 已更改,我们在所有者上调用 rt_mutex_adjust_prio,以查看所有者是否应相应地调整其优先级。

如果所有者也阻塞在锁上,并且其 pi_waiters 已更改(或死锁检查已开启),我们会解锁互斥锁的 wait_lock,并继续在所有者上运行 rt_mutex_adjust_prio_chain,如前所述。

现在所有锁都已释放,并且如果当前进程仍然阻塞在互斥锁上(等待者“task”字段不为 NULL),则我们将进入睡眠状态(调用 schedule)。

在循环中唤醒

然后,任务可能因以下几个原因唤醒
  1. 先前的锁所有者释放了锁,并且该任务现在是 top_waiter

  2. 我们收到信号或超时

在这两种情况下,任务都将再次尝试获取锁。如果成功,则它将从等待者树中删除自身,并将自身设置回 TASK_RUNNING 状态。

在第一种情况下,如果在该任务获取锁之前,另一个任务获取了锁,则它将返回睡眠状态,并等待再次被唤醒。

第二种情况仅适用于那些在获取锁之前可以唤醒的任务,可能是由于信号或超时(即 rt_mutex_timed_futex_lock())。当唤醒时,它将再次尝试获取锁,如果成功,则该任务将返回并持有锁,否则如果该任务是由信号唤醒,则将返回 -EINTR,如果超时,则返回 -ETIMEDOUT。

解锁互斥锁

对于那些具有 CMPXCHG 的架构,解锁互斥锁也有一个快速路径。由于在争用时获取互斥锁始终会设置互斥锁所有者的“有等待者”标志,因此我们使用它来知道在解锁互斥锁时是否需要进入慢速路径。如果互斥锁没有任何等待者,则互斥锁的所有者字段将等于当前进程,并且可以通过简单地将所有者字段替换为 NULL 来解锁互斥锁。

如果所有者字段设置了“有等待者”位(或者 CMPXCHG 不可用),则会采用慢速解锁路径。

在慢速解锁路径中完成的第一件事是获取互斥锁的 wait_lock。这会同步互斥锁的锁定和解锁。

检查互斥锁是否有等待者。在没有 CMPXCHG 指令的架构上,互斥锁的拥有者会在此处确定是否需要唤醒等待者。在有 CMPXCHG 指令的架构上,该检查在快速路径中完成,但慢速路径也仍然需要。如果一个互斥锁的等待者在拥有者未能通过快速路径 CMPXCHG 检查和获取 wait_lock 之间的时间内,由于信号或超时而被唤醒,则互斥锁可能没有任何等待者,因此拥有者仍然需要进行此检查。如果没有等待者,则将互斥锁的拥有者字段设置为 NULL,释放 wait_lock,无需进行其他操作。

如果有等待者,则我们需要唤醒一个。

在唤醒代码中,获取当前拥有者的 pi_lock。找到锁的最高优先级等待者,并将其从互斥锁的等待者树以及当前拥有者的 pi_waiters 树中移除。“Has Waiters” 位被标记,以防止低优先级任务窃取锁。

最后,我们解锁待定拥有者的 pi_lock 并将其唤醒。

联系方式

有关此文档的更新,请发送电子邮件至 Steven Rostedt <rostedt@goodmis.org>

鸣谢

作者:Steven Rostedt <rostedt@goodmis.org>

更新:Alex Shi <alex.shi@linaro.org> - 2017年7月6日

原始审阅者

Ingo Molnar, Thomas Gleixner, Thomas Duetsch 和 Randy Dunlap

更新(2017年7月6日)审阅者:Steven Rostedt 和 Sebastian Siewior

更新历史

此文档最初为 2.6.17-rc3-mm1 编写,在 4.12 版本进行了更新