使用 RCU hlist_nulls 保护列表和对象

本节介绍如何使用 hlist_nulls 来保护主要用于读取的链表和对象,这些对象使用 SLAB_TYPESAFE_BY_RCU 分配。

请阅读使用 RCU 保护主要用于读取的链表中的基础知识。

使用“nulls”

使用特殊的标记(称为“nulls”)是一种解决以下问题的便捷方法。

如果没有“nulls”,典型的 RCU 链表管理使用 SLAB_TYPESAFE_BY_RCU kmem_cache 分配的对象,可以使用以下算法。以下示例假设 “obj” 是指向此类对象的指针,该对象具有以下类型。

struct object {
  struct hlist_node obj_node;
  atomic_t refcnt;
  unsigned int key;
};

1) 查找算法

begin:
rcu_read_lock();
obj = lockless_lookup(key);
if (obj) {
  if (!try_get_ref(obj)) { // might fail for free objects
    rcu_read_unlock();
    goto begin;
  }
  /*
  * Because a writer could delete object, and a writer could
  * reuse these object before the RCU grace period, we
  * must check key after getting the reference on object
  */
  if (obj->key != key) { // not the object we expected
    put_ref(obj);
    rcu_read_unlock();
    goto begin;
  }
}
rcu_read_unlock();

注意,lockless_lookup(key) 不能使用传统的 hlist_for_each_entry_rcu(),而是使用带有额外内存屏障 (smp_rmb()) 的版本

lockless_lookup(key)
{
  struct hlist_node *node, *next;
  for (pos = rcu_dereference((head)->first);
       pos && ({ next = pos->next; smp_rmb(); prefetch(next); 1; }) &&
       ({ obj = hlist_entry(pos, typeof(*obj), obj_node); 1; });
       pos = rcu_dereference(next))
    if (obj->key == key)
      return obj;
  return NULL;
}

并且请注意传统的 hlist_for_each_entry_rcu() 缺少此 smp_rmb()

struct hlist_node *node;
for (pos = rcu_dereference((head)->first);
     pos && ({ prefetch(pos->next); 1; }) &&
     ({ obj = hlist_entry(pos, typeof(*obj), obj_node); 1; });
     pos = rcu_dereference(pos->next))
  if (obj->key == key)
    return obj;
return NULL;

引用 Corey Minyard 的话

"If the object is moved from one list to another list in-between the
time the hash is calculated and the next field is accessed, and the
object has moved to the end of a new list, the traversal will not
complete properly on the list it should have, since the object will
be on the end of the new list and there's not a way to tell it's on a
new list and restart the list traversal. I think that this can be
solved by pre-fetching the "next" field (with proper barriers) before
checking the key."

2) 插入算法

我们需要确保读取器不能读取新的 “obj->obj_node.next” 值和 “obj->key” 的先前值。 否则,可以从一个链中删除一个项目,然后将其插入到另一个链中。 如果新链在移动之前是空的,“next” 指针为 NULL,并且无锁读取器无法检测到它错过了原始链中的后续项目这一事实。

/*
 * Please note that new inserts are done at the head of list,
 * not in the middle or end.
 */
obj = kmem_cache_alloc(...);
lock_chain(); // typically a spin_lock()
obj->key = key;
atomic_set_release(&obj->refcnt, 1); // key before refcnt
hlist_add_head_rcu(&obj->obj_node, list);
unlock_chain(); // typically a spin_unlock()

3) 删除算法

这里没有什么特别的,我们可以使用标准的 RCU hlist 删除。 但由于 SLAB_TYPESAFE_BY_RCU,请注意删除的对象可以非常非常快地被重用(在 RCU 宽限期结束之前)

if (put_last_reference_on(obj) {
  lock_chain(); // typically a spin_lock()
  hlist_del_init_rcu(&obj->obj_node);
  unlock_chain(); // typically a spin_unlock()
  kmem_cache_free(cachep, obj);
}

避免额外的 smp_rmb()

使用 hlist_nulls,我们可以避免在 lockless_lookup() 中使用额外的 smp_rmb()。

例如,如果我们选择将槽号存储为哈希表的每个槽的“nulls”列表末尾标记,我们可以检测到竞争(一些写入器对一个对象进行了删除和/或移动到另一个链的操作),检查最终的“nulls”值(如果查找到达链的末尾)。如果最终的“nulls”值不是槽号,那么我们必须从头开始重新启动查找。如果该对象被移动到同一链中,则读取器并不关心:它可能会偶尔再次扫描该列表,而不会造成危害。

请注意,使用 hlist_nulls 意味着 “struct object” 的 “obj_node” 字段的类型变为 “struct hlist_nulls_node”。

1) 查找算法

head = &table[slot];
begin:
rcu_read_lock();
hlist_nulls_for_each_entry_rcu(obj, node, head, obj_node) {
  if (obj->key == key) {
    if (!try_get_ref(obj)) { // might fail for free objects
      rcu_read_unlock();
      goto begin;
    }
    if (obj->key != key) { // not the object we expected
      put_ref(obj);
      rcu_read_unlock();
      goto begin;
    }
    goto out;
  }
}

// If the nulls value we got at the end of this lookup is
// not the expected one, we must restart lookup.
// We probably met an item that was moved to another chain.
if (get_nulls_value(node) != slot) {
  put_ref(obj);
  rcu_read_unlock();
  goto begin;
}
obj = NULL;

out:
rcu_read_unlock();

2) 插入算法

与上面的相同,但使用 hlist_nulls_add_head_rcu() 而不是 hlist_add_head_rcu()

/*
 * Please note that new inserts are done at the head of list,
 * not in the middle or end.
 */
obj = kmem_cache_alloc(cachep);
lock_chain(); // typically a spin_lock()
obj->key = key;
atomic_set_release(&obj->refcnt, 1); // key before refcnt
/*
 * insert obj in RCU way (readers might be traversing chain)
 */
hlist_nulls_add_head_rcu(&obj->obj_node, list);
unlock_chain(); // typically a spin_unlock()