RCU和可卸载模块¶
[最初发表于 LWN 2007 年 1 月 14 日:http://lwn.net/Articles/217484/]
RCU 更新器有时使用 call_rcu()
来启动对宽限期经过的异步等待。此原语接受一个指向位于 RCU 保护的数据结构中的 rcu_head 结构体的指针,以及另一个指向稍后可能被调用以释放该结构体的函数的指针。然后,从 IRQ 上下文中删除链表中的元素 p 的代码可能如下
list_del_rcu(p);
call_rcu(&p->rcu, p_callback);
由于 call_rcu()
永远不会阻塞,因此可以在 IRQ 上下文中安全地使用此代码。函数 p_callback() 可以定义如下
static void p_callback(struct rcu_head *rp)
{
struct pstruct *p = container_of(rp, struct pstruct, rcu);
kfree(p);
}
卸载使用 call_rcu() 的模块¶
但是,如果 p_callback() 函数是在可卸载模块中定义的呢?
如果我们在某些 RCU 回调挂起时卸载模块,则执行这些回调的 CPU 将在稍后被调用时感到非常失望,如 http://lwn.net/images/ns/kernel/rcu-drop.jpg 中生动地描绘的那样。
我们可以尝试在模块退出代码路径中放置一个 synchronize_rcu()
,但这并不足够。虽然 synchronize_rcu()
确实会等待宽限期经过,但它不会等待回调完成。
人们可能会试图尝试几个背靠背的 synchronize_rcu()
调用,但这仍然不能保证有效。如果 RCU 回调负载非常重,那么可能会推迟某些回调,以便允许进行其他处理。例如,在实时内核中需要这种延迟,以避免过度的调度延迟。
rcu_barrier()¶
这种情况可以通过 rcu_barrier()
原语来处理。 rcu_barrier()
不是等待宽限期经过,而是等待所有未完成的 RCU 回调完成。请注意,rcu_barrier()
不 意味着 synchronize_rcu()
,特别是,如果没有 RCU 回调在任何地方排队,rcu_barrier()
有权立即返回,而不等待任何事情,更不用说宽限期了。
使用 rcu_barrier()
的伪代码如下
阻止发布任何新的 RCU 回调。
执行
rcu_barrier()
。允许卸载模块。
还有一个用于 SRCU 的 srcu_barrier()
函数,当然,您必须将 srcu_barrier()
的风格与 call_srcu()
的风格相匹配。如果您的模块使用多个 srcu_struct 结构,那么在卸载该模块时,它也必须使用多个 srcu_barrier()
调用。例如,如果它在 srcu_struct_1 上使用 call_rcu()
、call_srcu()
,并在 srcu_struct_2 上使用 call_srcu()
,那么在卸载时将需要以下三行代码
1 rcu_barrier();
2 srcu_barrier(&srcu_struct_1);
3 srcu_barrier(&srcu_struct_2);
如果延迟至关重要,则可以使用工作队列并发运行这三个函数。
rcutorture 模块的旧版本在其退出函数中使用 rcu_barrier()
,如下所示
1 static void
2 rcu_torture_cleanup(void)
3 {
4 int i;
5
6 fullstop = 1;
7 if (shuffler_task != NULL) {
8 VERBOSE_PRINTK_STRING("Stopping rcu_torture_shuffle task");
9 kthread_stop(shuffler_task);
10 }
11 shuffler_task = NULL;
12
13 if (writer_task != NULL) {
14 VERBOSE_PRINTK_STRING("Stopping rcu_torture_writer task");
15 kthread_stop(writer_task);
16 }
17 writer_task = NULL;
18
19 if (reader_tasks != NULL) {
20 for (i = 0; i < nrealreaders; i++) {
21 if (reader_tasks[i] != NULL) {
22 VERBOSE_PRINTK_STRING(
23 "Stopping rcu_torture_reader task");
24 kthread_stop(reader_tasks[i]);
25 }
26 reader_tasks[i] = NULL;
27 }
28 kfree(reader_tasks);
29 reader_tasks = NULL;
30 }
31 rcu_torture_current = NULL;
32
33 if (fakewriter_tasks != NULL) {
34 for (i = 0; i < nfakewriters; i++) {
35 if (fakewriter_tasks[i] != NULL) {
36 VERBOSE_PRINTK_STRING(
37 "Stopping rcu_torture_fakewriter task");
38 kthread_stop(fakewriter_tasks[i]);
39 }
40 fakewriter_tasks[i] = NULL;
41 }
42 kfree(fakewriter_tasks);
43 fakewriter_tasks = NULL;
44 }
45
46 if (stats_task != NULL) {
47 VERBOSE_PRINTK_STRING("Stopping rcu_torture_stats task");
48 kthread_stop(stats_task);
49 }
50 stats_task = NULL;
51
52 /* Wait for all RCU callbacks to fire. */
53 rcu_barrier();
54
55 rcu_torture_stats_print(); /* -After- the stats thread is stopped! */
56
57 if (cur_ops->cleanup != NULL)
58 cur_ops->cleanup();
59 if (atomic_read(&n_rcu_torture_error))
60 rcu_torture_print_module_parms("End of test: FAILURE");
61 else
62 rcu_torture_print_module_parms("End of test: SUCCESS");
63 }
第 6 行设置了一个全局变量,以防止任何 RCU 回调重新发布自身。这在大多数情况下是不必要的,因为 RCU 回调很少包括对 call_rcu()
的调用。但是,rcutorture 模块是此规则的一个例外,因此需要设置此全局变量。
第 7-50 行停止了与 rcutorture 模块关联的所有内核任务。因此,一旦执行到达第 53 行,将不会发布更多 rcutorture RCU 回调。第 53 行上的 rcu_barrier()
调用会等待任何预先存在的回调完成。
然后,第 55-62 行打印状态并执行特定于操作的清理,然后返回,允许完成模块卸载操作。
- 快速测验 #1
还有其他任何情况需要
rcu_barrier()
吗?
您的模块可能存在其他复杂情况。例如,如果您的模块从定时器调用 call_rcu()
,您需要首先避免发布新的定时器,取消(或等待)所有已发布的定时器,然后才能调用 rcu_barrier()
以等待任何剩余的 RCU 回调完成。
当然,如果您的模块使用 call_rcu()
,您需要在卸载之前调用 rcu_barrier()
。同样,如果您的模块使用 call_srcu()
,您需要在卸载之前调用 srcu_barrier()
,并且在同一个 srcu_struct 结构上调用。如果您的模块使用 call_rcu()
和 call_srcu()
,那么(如上所述)您需要调用 rcu_barrier()
和 srcu_barrier()
。
实现 rcu_barrier()¶
Dipankar Sarma 对 rcu_barrier()
的实现利用了以下事实:RCU 回调一旦在每个 CPU 队列之一上排队,就永远不会重新排序。他的实现将 RCU 回调排队在每个 CPU 回调队列上,然后等待它们全部开始执行,此时,保证所有较早的 RCU 回调都已完成。
rcu_barrier()
的原始代码大致如下
1 void rcu_barrier(void)
2 {
3 BUG_ON(in_interrupt());
4 /* Take cpucontrol mutex to protect against CPU hotplug */
5 mutex_lock(&rcu_barrier_mutex);
6 init_completion(&rcu_barrier_completion);
7 atomic_set(&rcu_barrier_cpu_count, 1);
8 on_each_cpu(rcu_barrier_func, NULL, 0, 1);
9 if (atomic_dec_and_test(&rcu_barrier_cpu_count))
10 complete(&rcu_barrier_completion);
11 wait_for_completion(&rcu_barrier_completion);
12 mutex_unlock(&rcu_barrier_mutex);
13 }
第 3 行验证调用者是否在进程上下文中,第 5 行和第 12 行使用 rcu_barrier_mutex 来确保一次只有一个 rcu_barrier()
使用全局完成和计数器,这些计数器在第 6 行和第 7 行初始化。第 8 行使每个 CPU 调用 rcu_barrier_func(),如下所示。请注意,on_each_cpu() 的参数列表中的最后一个 “1” 确保对 rcu_barrier_func() 的所有调用在 on_each_cpu() 返回之前完成。第 9 行从 rcu_barrier_cpu_count 中删除初始计数,如果此计数现在为零,则第 10 行完成完成,这会阻止第 11 行阻塞。无论哪种方式,第 11 行都会等待(如果需要)完成。
- 快速测验 #2
为什么第 8 行不将 rcu_barrier_cpu_count 初始化为零,从而避免对第 9 行和第 10 行的需求?
此代码在 2008 年重写,之后多次重写,但这仍然给出了总体思路。
rcu_barrier_func() 在每个 CPU 上运行,它调用 call_rcu()
以发布 RCU 回调,如下所示
1 static void rcu_barrier_func(void *notused)
2 {
3 int cpu = smp_processor_id();
4 struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
5 struct rcu_head *head;
6
7 head = &rdp->barrier;
8 atomic_inc(&rcu_barrier_cpu_count);
9 call_rcu(head, rcu_barrier_callback);
10 }
第 3 行和第 4 行定位 RCU 的内部每个 CPU rcu_data 结构,该结构包含稍后调用 call_rcu()
所需的 struct rcu_head。第 7 行获取指向此 struct rcu_head 的指针,第 8 行递增全局计数器。此计数器稍后将由回调递减。然后,第 9 行在当前 CPU 的队列上注册 rcu_barrier_callback()。
rcu_barrier_callback() 函数只是原子地递减 rcu_barrier_cpu_count 变量,并在其达到零时完成完成,如下所示
1 static void rcu_barrier_callback(struct rcu_head *notused)
2 {
3 if (atomic_dec_and_test(&rcu_barrier_cpu_count))
4 complete(&rcu_barrier_completion);
5 }
- 快速测验 #3
如果 CPU 0 的 rcu_barrier_func() 立即执行(因此将 rcu_barrier_cpu_count 递增到值 1),但其他 CPU 的 rcu_barrier_func() 调用延迟了完整的宽限期会发生什么?这是否会导致
rcu_barrier()
过早返回?
当前的 rcu_barrier()
实现更加复杂,这是由于需要避免干扰空闲 CPU(尤其是在电池供电的系统上)以及需要最小程度地干扰实时系统中的非空闲 CPU。此外,还应用了大量的优化。但是,上面的代码说明了这些概念。
rcu_barrier() 摘要¶
rcu_barrier()
原语的使用频率相对较低,因为大多数使用 RCU 的代码都在核心内核中,而不是在模块中。但是,如果您从可卸载模块中使用 RCU,则需要使用 rcu_barrier()
,以便可以安全地卸载您的模块。
快速测验的答案¶
- 快速测验 #1
还有其他任何情况需要
rcu_barrier()
吗?- 答案
有趣的是,
rcu_barrier()
最初不是为模块卸载而实现的。 Nikita Danilov 在文件系统中使用 RCU,这导致了文件系统卸载时的类似情况。 Dipankar Sarma 对rcu_barrier()
进行了编码以响应,以便 Nikita 可以在文件系统卸载过程中调用它。很久以后,我自己在实现 rcutorture 时遇到了 RCU 模块卸载问题,并发现
rcu_barrier()
也解决了这个问题。
- 快速测验 #2
为什么第 8 行不将 rcu_barrier_cpu_count 初始化为零,从而避免对第 9 行和第 10 行的需求?
- 答案
假设第 8 行上显示的 on_each_cpu() 函数被延迟,因此 CPU 0 的 rcu_barrier_func() 执行并且相应的宽限期经过,所有这些都在 CPU 1 的 rcu_barrier_func() 开始执行之前。这将导致 rcu_barrier_cpu_count 递减到零,因此第 11 行的 wait_for_completion() 将立即返回,而未能等待 CPU 1 的回调被调用。
请注意,当
rcu_barrier()
代码在 2005 年首次添加时,这不是一个问题。这是因为 on_each_cpu() 禁用了抢占,这充当了 RCU 读取端临界区,从而阻止了 CPU 0 的宽限期完成,直到 on_each_cpu() 处理了所有 CPU。但是,随着 v4.20 左右的 RCU 风格整合,再次排除了这种可能性,因为整合后的 RCU 再次等待代码的非抢占区域。
但是,额外的计数仍然可能是一个好主意。依赖于这些偶然的实现可能会在实现更改时导致以后的意外错误。
- 快速测验 #3
如果 CPU 0 的 rcu_barrier_func() 立即执行(因此将 rcu_barrier_cpu_count 递增到值 1),但其他 CPU 的 rcu_barrier_func() 调用延迟了完整的宽限期会发生什么?这是否会导致
rcu_barrier()
过早返回?- 答案
这不可能发生。原因是 on_each_cpu() 的最后一个参数(等待标志)设置为 “1”。此标志通过 smp_call_function() 并进一步传递到 smp_call_function_on_cpu(),导致后者旋转,直到 rcu_barrier_func() 的跨 CPU 调用完成。这本身会阻止宽限期在非 CONFIG_PREEMPTION 内核上完成,因为每个 CPU 必须经过上下文切换(或其他静止状态)才能完成宽限期。但是,这在 CONFIG_PREEMPTION 内核中没有用处。
因此,on_each_cpu() 在其调用 smp_call_function() 期间以及本地调用 rcu_barrier_func() 期间禁用抢占。由于最近的 RCU 实现将禁用抢占的代码区域视为 RCU 读取端临界区,因此这会阻止宽限期完成。这意味着所有 CPU 都已执行 rcu_barrier_func(),然后第一个 rcu_barrier_callback() 才可能执行,进而防止 rcu_barrier_cpu_count 过早达到零。
但是,如果 on_each_cpu() 曾经决定放弃禁用抢占(由于实时延迟考虑,这很可能发生),那么将 rcu_barrier_cpu_count 初始化为 1 将会挽救局面。