伤/等待无死锁互斥锁设计¶
请先阅读通用互斥锁子系统,因为它也适用于等待/伤互斥锁。
WW互斥锁的动机¶
GPU执行的操作通常涉及许多缓冲区。这些缓冲区可以在上下文/进程之间共享,存在于不同的内存域(例如VRAM与系统内存)中,等等。使用PRIME/dmabuf,它们甚至可以在设备之间共享。因此,驱动程序在少数情况下需要等待缓冲区准备就绪。如果从等待缓冲区互斥锁变为可用性的角度考虑这一点,就会出现问题,因为无法保证缓冲区在所有上下文中都以相同的顺序出现在execbuf/批处理中。这直接受用户空间控制,并且是应用程序发出的GL调用序列的结果。这会导致死锁的可能性。当您考虑到内核可能需要在GPU操作缓冲区之前将缓冲区迁移到VRAM中,这可能反过来需要驱逐一些其他缓冲区(并且您不希望驱逐已经排队到GPU的其他缓冲区)时,问题变得更加复杂,但是为了简化对问题的理解,您可以忽略这一点。
TTM图形子系统提出的用于处理此问题的算法非常简单。对于每个需要锁定的缓冲区组(execbuf),将从全局计数器为调用者分配一个唯一的保留ID/票证。如果在锁定与execbuf关联的所有缓冲区时发生死锁,则保留票证最低的(即,最旧的任务)获胜,而保留ID较高的(即,较新的任务)解锁它已锁定的所有缓冲区,然后重试。
在RDBMS文献中,保留票证与事务相关联。死锁处理方法称为Wait-Die。该名称基于锁定线程在遇到已锁定的互斥锁时的操作。如果持有锁的事务较新,则锁定事务将等待。如果持有锁的事务较旧,则锁定事务将退避并死亡。因此称为Wait-Die。还有另一种算法称为Wound-Wait:如果持有锁的事务较新,则锁定事务会伤持有锁的事务,要求它死亡。如果持有锁的事务较旧,它将等待另一个事务。因此称为Wound-Wait。这两种算法都是公平的,因为事务最终会成功。但是,通常认为与Wait-Die相比,Wound-Wait算法会生成更少的退避,但另一方面,从退避中恢复时,其工作量比Wait-Die大。Wound-Wait也是一种抢占式算法,事务会被其他事务伤到,这需要一种可靠的方法来获取被伤到的条件并抢占正在运行的事务。请注意,这与进程抢占不同。当Wound-Wait事务在被伤后死亡(返回-EDEADLK)时,它被认为是抢占的。
概念¶
与普通互斥锁相比,w/w互斥锁的锁接口中出现了两个额外的概念/对象
获取上下文:为了确保最终的向前进展,重要的是,尝试获取锁的任务不会获取新的保留ID,而是保留在开始获取锁时获取的ID。此票证存储在获取上下文中。此外,获取上下文还会跟踪调试状态,以捕获w/w互斥锁接口的滥用。获取上下文表示一个事务。
W/w类:与普通互斥锁相比,w/w互斥锁的锁类需要是显式的,因为它需要初始化获取上下文。锁类还指定要使用的算法,Wound-Wait或Wait-Die。
此外,还有三种不同的w/w锁获取函数
使用上下文的普通锁获取,使用ww_mutex_lock。
在争用锁上进行慢速路径锁获取,由在放弃所有已获取的锁后刚刚终止其事务的任务使用。这些函数具有_slow后缀。
从简单的语义角度来看,_slow函数并非严格要求,因为简单地在争用锁上调用正常的ww_mutex_lock函数(在放弃所有其他已获取的锁后)将正常工作。毕竟,如果尚未获取其他ww互斥锁,则没有死锁的可能性,因此ww_mutex_lock调用将阻塞,而不会过早返回-EDEADLK。_slow函数的优点在于接口安全
ww_mutex_lock具有__must_check int返回类型,而ww_mutex_lock_slow具有void返回类型。请注意,由于ww互斥锁代码无论如何都需要循环/重试,因此__must_check不会导致虚假的警告,即使第一次锁操作永远不会失败。
启用完整调试后,ww_mutex_lock_slow会检查是否已释放所有已获取的ww互斥锁(防止死锁),并确保我们在争用锁上阻塞(防止在-EDEADLK慢速路径中旋转,直到可以获取争用锁)。
仅获取单个w/w互斥锁的函数,其结果与普通互斥锁的语义完全相同。这是通过使用NULL上下文调用ww_mutex_lock完成的。
再次强调,这不是严格要求的。但是,通常您只想获取一个锁,在这种情况下,设置获取上下文是没有意义的(因此最好避免获取死锁避免票证)。
当然,也提供了用于处理由于信号引起的唤醒的所有常用变体。
用法¶
通过使用DEFINE_WW_CLASS() (Wound-Wait)或DEFINE_WD_CLASS() (Wait-Die)来选择算法(Wait-Die vs Wound-Wait)。根据经验,如果期望同时竞争事务的数量通常很少,并且您希望减少回滚次数,则应使用Wound-Wait。
在同一w/w类中获取锁的三种不同方法。方法#1和#2的通用定义
static DEFINE_WW_CLASS(ww_class);
struct obj {
struct ww_mutex lock;
/* obj data */
};
struct obj_entry {
struct list_head head;
struct obj *obj;
};
方法1,使用execbuf->buffers中不允许重新排序的列表。如果所需对象的列表已经跟踪在某处,则此方法很有用。此外,如果对象在列表中出现两次,则锁助手可以将-EALREADY返回代码传播回调用者,作为信号。如果该列表是根据用户空间输入构造的,并且ABI要求用户空间没有重复的条目(例如,对于gpu commandbuffer提交ioctl),则此方法很有用。
int lock_objs(struct list_head *list, struct ww_acquire_ctx *ctx)
{
struct obj *res_obj = NULL;
struct obj_entry *contended_entry = NULL;
struct obj_entry *entry;
ww_acquire_init(ctx, &ww_class);
retry:
list_for_each_entry (entry, list, head) {
if (entry->obj == res_obj) {
res_obj = NULL;
continue;
}
ret = ww_mutex_lock(&entry->obj->lock, ctx);
if (ret < 0) {
contended_entry = entry;
goto err;
}
}
ww_acquire_done(ctx);
return 0;
err:
list_for_each_entry_continue_reverse (entry, list, head)
ww_mutex_unlock(&entry->obj->lock);
if (res_obj)
ww_mutex_unlock(&res_obj->lock);
if (ret == -EDEADLK) {
/* we lost out in a seqno race, lock and retry.. */
ww_mutex_lock_slow(&contended_entry->obj->lock, ctx);
res_obj = contended_entry->obj;
goto retry;
}
ww_acquire_fini(ctx);
return ret;
}
方法2,使用execbuf->buffers中可以重新排序的列表。与上述方法1相同的重复条目检测语义,使用-EALREADY。但是,列表重新排序允许使用更多惯用的代码
int lock_objs(struct list_head *list, struct ww_acquire_ctx *ctx)
{
struct obj_entry *entry, *entry2;
ww_acquire_init(ctx, &ww_class);
list_for_each_entry (entry, list, head) {
ret = ww_mutex_lock(&entry->obj->lock, ctx);
if (ret < 0) {
entry2 = entry;
list_for_each_entry_continue_reverse (entry2, list, head)
ww_mutex_unlock(&entry2->obj->lock);
if (ret != -EDEADLK) {
ww_acquire_fini(ctx);
return ret;
}
/* we lost out in a seqno race, lock and retry.. */
ww_mutex_lock_slow(&entry->obj->lock, ctx);
/*
* Move buf to head of the list, this will point
* buf->next to the first unlocked entry,
* restarting the for loop.
*/
list_del(&entry->head);
list_add(&entry->head, list);
}
}
ww_acquire_done(ctx);
return 0;
}
方法#1和#2的解锁方式相同
void unlock_objs(struct list_head *list, struct ww_acquire_ctx *ctx)
{
struct obj_entry *entry;
list_for_each_entry (entry, list, head)
ww_mutex_unlock(&entry->obj->lock);
ww_acquire_fini(ctx);
}
如果对象的列表是临时构建的,而不是预先构建的,则方法3很有用,例如,在调整图形中的边时,其中每个节点都有自己的ww_mutex锁,并且只有在持有所有涉及节点的锁时才能更改边。w/w互斥锁对于这种情况来说是很自然的,原因有两个
它们可以按任何顺序处理锁获取,这使我们可以从起点开始遍历图形,然后迭代地发现新的边并锁定这些边连接到的节点。
由于-EALREADY返回代码表示已持有给定对象,因此无需额外的簿记来打破图形中的循环或跟踪已持有哪个外观(当使用多个节点作为起点时)。
请注意,此方法在两个重要方面与上述方法不同
由于对象列表是动态构建的(并且由于达到-EDEADLK死亡条件而重试时很可能不同),因此当对象未被锁定时,无需将任何对象保留在持久列表中。因此,我们可以将list_head移动到对象本身中。
另一方面,动态对象列表构造也意味着无法传播-EALREADY返回代码。
另请注意,方法#1和#2和方法#3可以组合使用,例如,首先使用上述方法之一锁定起始节点列表(从用户空间传入)。然后使用下面的方法#3锁定受操作影响的任何其他对象。回退/重试过程将更加复杂,因为当动态锁定步骤达到-EDEADLK时,我们还需要解锁使用固定列表获取的所有对象。但是,w/w互斥锁调试检查将捕获这些情况下的任何接口滥用。
此外,方法3不能使锁获取步骤失败,因为它不返回-EALREADY。当然,当使用_interruptible变体时,情况会有所不同,但这超出了此处这些示例的范围
struct obj {
struct ww_mutex ww_mutex;
struct list_head locked_list;
};
static DEFINE_WW_CLASS(ww_class);
void __unlock_objs(struct list_head *list)
{
struct obj *entry, *temp;
list_for_each_entry_safe (entry, temp, list, locked_list) {
/* need to do that before unlocking, since only the current lock holder is
allowed to use object */
list_del(&entry->locked_list);
ww_mutex_unlock(entry->ww_mutex)
}
}
void lock_objs(struct list_head *list, struct ww_acquire_ctx *ctx)
{
struct obj *obj;
ww_acquire_init(ctx, &ww_class);
retry:
/* re-init loop start state */
loop {
/* magic code which walks over a graph and decides which objects
* to lock */
ret = ww_mutex_lock(obj->ww_mutex, ctx);
if (ret == -EALREADY) {
/* we have that one already, get to the next object */
continue;
}
if (ret == -EDEADLK) {
__unlock_objs(list);
ww_mutex_lock_slow(obj, ctx);
list_add(&entry->locked_list, list);
goto retry;
}
/* locked a new object, add it to the list */
list_add_tail(&entry->locked_list, list);
}
ww_acquire_done(ctx);
return 0;
}
void unlock_objs(struct list_head *list, struct ww_acquire_ctx *ctx)
{
__unlock_objs(list);
ww_acquire_fini(ctx);
}
方法4:仅锁定一个对象。在这种情况下,死锁检测和预防显然是多余的,因为仅获取一个锁,您就无法在同一类中产生死锁。为了简化这种情况,可以将w/w互斥锁api与NULL上下文一起使用。
实现细节¶
设计:¶
ww_mutex 当前封装了一个 struct mutex,这意味着对于更常见的普通互斥锁,没有额外的开销。因此,如果不使用 wait/wound 互斥锁,代码大小只会略有增加。
我们为等待列表维护以下不变性:
具有获取上下文的等待者按时间戳顺序排序;没有获取上下文的等待者以 FIFO 顺序穿插排列。
对于 Wait-Die,在具有上下文的等待者中,只有第一个等待者可以拥有已获取的其他锁 (ctx->acquired > 0)。请注意,此等待者可能在列表中其他没有上下文的等待者之后。
Wound-Wait 抢占使用延迟抢占方案实现:仅当出现新锁的争用,从而真正有可能发生死锁时,才会检查事务的 wounded 状态。在这种情况下,如果事务处于 wounded 状态,它会回退,清除 wounded 状态并重试。以这种方式实现抢占的一个巨大好处是,wounded 事务可以在重新启动事务之前识别要等待的争用锁。只是盲目地重新启动事务很可能会使事务最终陷入必须再次回退的境地。
一般来说,预计不会有太多争用。这些锁通常用于序列化对设备资源的访问,因此优化重点应该放在非争用的情况上。
Lockdep:¶
已特别注意警告尽可能多的 API 滥用情况。一些常见的 API 滥用情况将通过 CONFIG_DEBUG_MUTEXES 捕获,但建议使用 CONFIG_PROVE_LOCKING。
- 将警告的一些错误:
忘记调用 ww_acquire_fini 或 ww_acquire_init。
在 ww_acquire_done 之后尝试锁定更多互斥锁。
在 -EDEADLK 并解锁所有互斥锁后尝试锁定错误的互斥锁。
在 -EDEADLK 之后,在解锁所有互斥锁之前,尝试锁定正确的互斥锁。
在返回 -EDEADLK 之前调用 ww_mutex_lock_slow。
使用错误的解锁函数解锁互斥锁。
在同一上下文上调用 ww_acquire_* 两次。
为互斥锁使用与 ww_acquire_ctx 不同的 ww_class。
可能导致死锁的正常 lockdep 错误。
- 可能导致死锁的一些 lockdep 错误:
在对第一个 ww_acquire_ctx 调用 ww_acquire_fini 之前,调用 ww_acquire_init 来初始化第二个 ww_acquire_ctx。
可能发生的“普通”死锁。
- FIXME
一旦我们实现了 TASK_DEADLOCK 任务状态标志魔法,就更新此部分。