循环缓冲区

作者:

David Howells <dhowells@redhat.com>

作者:

Paul E. McKenney <paulmck@linux.ibm.com>

Linux 提供了许多可用于实现循环缓冲的功能。这些功能分为两类:

  1. 用于确定 2 的幂大小缓冲区信息的便利函数。

  2. 当缓冲区中对象的生产者和消费者不想共享锁时使用的内存屏障。

要使用这些功能,如以下所述,只需要一个生产者和一个消费者。可以通过串行化来处理多个生产者,也可以通过串行化来处理多个消费者。

什么是循环缓冲区?

首先,什么是循环缓冲区?循环缓冲区是一个固定大小、有限容量的缓冲区,其中包含两个索引:

  1. 一个“头部”索引 — 生产者向缓冲区中插入项的位置。

  2. 一个“尾部”索引 — 消费者在缓冲区中找到下一个项的位置。

通常,当尾部指针等于头部指针时,缓冲区为空;当头部指针比尾部指针小 1 时,缓冲区为满。

当添加项时,头部索引递增;当移除项时,尾部索引递增。尾部索引不应越过头部索引,并且当它们到达缓冲区末尾时,两个索引都应回绕到 0,从而允许无限量的数据流过缓冲区。

通常,所有项都将具有相同的单位大小,但这并非使用以下技术的严格要求。如果缓冲区中要包含多个项或可变大小的项,索引可以增加 1 以上,前提是两个索引都不能超越对方。但是,实现者必须小心,因为大小超过一个单位的区域可能会在缓冲区末尾回绕并被分成两个段。

测量 2 的幂缓冲区

计算任意大小循环缓冲区的占用率或剩余容量通常是一个缓慢的操作,需要使用模(除法)指令。但是,如果缓冲区的大小是 2 的幂,则可以使用更快的位与指令代替。

Linux 提供了一组用于处理 2 的幂循环缓冲区的宏。这些宏可以通过以下方式使用:

#include <linux/circ_buf.h>

这些宏是:

  1. 测量缓冲区的剩余容量

    CIRC_SPACE(head_index, tail_index, buffer_size);
    

    这会返回缓冲区中剩余的空间量[1],可用于插入项。

  2. 测量缓冲区中最大连续即时空间

    CIRC_SPACE_TO_END(head_index, tail_index, buffer_size);
    

    这会返回缓冲区中剩余的连续空间量[1],可用于立即插入项而无需回绕到缓冲区开头。

  3. 测量缓冲区的占用率

    CIRC_CNT(head_index, tail_index, buffer_size);
    

    这会返回当前占用缓冲区[2]的项数。

  4. 测量缓冲区的非回绕占用率

    CIRC_CNT_TO_END(head_index, tail_index, buffer_size);
    

    这会返回可从缓冲区中提取的连续项数[2],而无需回绕到缓冲区开头。

每个宏名义上将返回一个介于 0 和 buffer_size-1 之间的值,但是:

  1. CIRC_SPACE*() 旨在用于生产者。对于生产者,它们将返回一个下限,因为生产者控制头部索引,但消费者可能仍在另一个 CPU 上消耗缓冲区并移动尾部索引。

    对消费者而言,它将显示一个上限,因为生产者可能正在忙于消耗空间。

  2. CIRC_CNT*() 旨在用于消费者。对于消费者,它们将返回一个下限,因为消费者控制尾部索引,但生产者可能仍在另一个 CPU 上填充缓冲区并移动头部索引。

    对于生产者,它将显示一个上限,因为消费者可能正在忙于清空缓冲区。

  3. 对于第三方而言,生产者和消费者对索引的写入可见顺序无法保证,因为它们是独立的,并且可能在不同的 CPU 上进行——因此,在这种情况下,结果仅仅是一个猜测,甚至可能是负数。

在循环缓冲区中使用内存屏障

通过在循环缓冲区中结合使用内存屏障,您可以避免以下情况:

  1. 使用单个锁来管理对缓冲区两端的访问,从而允许缓冲区同时被填充和清空;以及

  2. 使用原子计数器操作。

这有两个方面:填充缓冲区的生产者和清空缓冲区的消费者。在任何给定时间,只能有一个对象填充缓冲区,也只能有一个对象清空缓冲区,但这两方可以同时操作。

生产者

生产者将大致如下所示:

spin_lock(&producer_lock);

unsigned long head = buffer->head;
/* The spin_unlock() and next spin_lock() provide needed ordering. */
unsigned long tail = READ_ONCE(buffer->tail);

if (CIRC_SPACE(head, tail, buffer->size) >= 1) {
        /* insert one item into the buffer */
        struct item *item = buffer[head];

        produce_item(item);

        smp_store_release(buffer->head,
                          (head + 1) & (buffer->size - 1));

        /* wake_up() will make sure that the head is committed before
         * waking anyone up */
        wake_up(consumer);
}

spin_unlock(&producer_lock);

这将指示 CPU,新项的内容必须在头部索引使其对消费者可用之前写入,然后指示 CPU,修改后的头部索引必须在唤醒消费者之前写入。

请注意,wake_up() 不保证任何类型的屏障,除非有实际的东西被唤醒。因此,我们不能依赖它来保证排序。然而,数组中总是会留有一个空元素。因此,生产者必须生产两个元素,才能有可能损坏当前正在被消费者读取的元素。因此,消费者连续调用之间的解锁-加锁对,提供了消费者腾空给定元素所指示的索引读取与生产者向同一元素写入之间所需的排序。

消费者

消费者将大致如下所示:

spin_lock(&consumer_lock);

/* Read index before reading contents at that index. */
unsigned long head = smp_load_acquire(buffer->head);
unsigned long tail = buffer->tail;

if (CIRC_CNT(head, tail, buffer->size) >= 1) {

        /* extract one item from the buffer */
        struct item *item = buffer[tail];

        consume_item(item);

        /* Finish reading descriptor before incrementing tail. */
        smp_store_release(buffer->tail,
                          (tail + 1) & (buffer->size - 1));
}

spin_unlock(&consumer_lock);

这将指示 CPU 确保在读取新项之前索引是最新的,然后它将确保 CPU 在写入新的尾部指针(这将擦除该项)之前已完成项的读取。

请注意使用 READ_ONCE() 和 smp_load_acquire() 来读取对方索引。这可以防止编译器丢弃并重新加载其缓存值。如果您能确保对方索引只使用一次,则这并非严格需要。smp_load_acquire() 还会强制 CPU 对后续内存引用进行排序。类似地,smp_store_release() 在两种算法中都用于写入线程的索引。这记录了我们正在写入可以并发读取的内容这一事实,防止编译器撕裂存储,并强制对以前的访问进行排序。

进一步阅读

另请参阅 Documentation/memory-barriers.txt,了解 Linux 内存屏障功能的描述。