英文

循环缓冲区

作者:

David Howells <dhowells@redhat.com>

作者:

Paul E. McKenney <paulmck@linux.ibm.com>

Linux 提供了许多可用于实现循环缓冲区的功能。 有两组这样的功能

  1. 用于确定关于 2 的幂大小的缓冲区的信息的便捷函数。

  2. 当缓冲区中对象的生产者和消费者不想共享锁时使用的内存屏障。

要使用这些设施,如下所述,只需要一个生产者和一个消费者。 可以通过序列化它们来处理多个生产者,以及通过序列化它们来处理多个消费者。

什么是循环缓冲区?

首先,什么是循环缓冲区? 循环缓冲区是具有固定、有限大小的缓冲区,其中有两个索引

  1. “头”索引 - 生产者将项目插入缓冲区的点。

  2. “尾”索引 - 消费者在缓冲区中找到下一个项目的点。

通常,当尾指针等于头指针时,缓冲区为空;当头指针比尾指针小 1 时,缓冲区已满。

头索引在添加项目时递增,尾索引在删除项目时递增。 尾索引永远不应跳过头索引,并且当两个索引到达缓冲区末尾时都应回绕到 0,从而允许无限量的数据流过缓冲区。

通常,项目都将具有相同的单元大小,但这不是严格要求使用以下技术的。 如果多个项目或可变大小的项目要包含在缓冲区中,则索引可以增加 1 以上,前提是任何一个索引都不超过另一个索引。 但是,实现者必须小心,因为超过一个单元大小的区域可能会环绕缓冲区的末尾并被分成两段。

测量 2 的幂缓冲区

计算任意大小的循环缓冲区的占用率或剩余容量通常是一个缓慢的操作,需要使用模数(除法)指令。 但是,如果缓冲区的大小为 2 的幂,则可以使用更快的按位与指令代替。

Linux 提供了一组用于处理 2 的幂循环缓冲区的宏。 可以通过以下方式使用它们

#include <linux/circ_buf.h>

这些宏是

  1. 测量缓冲区的剩余容量

    CIRC_SPACE(head_index, tail_index, buffer_size);
    

    这会返回缓冲区 [1] 中可插入项目的剩余空间量。

  2. 测量缓冲区中最大连续即时空间

    CIRC_SPACE_TO_END(head_index, tail_index, buffer_size);
    

    这会返回缓冲区 [1] 中剩余的连续空间量,可以将项目立即插入其中,而无需回绕到缓冲区的开头。

  3. 测量缓冲区的占用率

    CIRC_CNT(head_index, tail_index, buffer_size);
    

    这会返回当前占用缓冲区 [2] 的项目数。

  4. 测量缓冲区的非环绕占用率

    CIRC_CNT_TO_END(head_index, tail_index, buffer_size);
    

    这会返回可以从缓冲区中提取的连续项目 [2] 的数量,而无需回绕到缓冲区的开头。

这些宏中的每一个都将名义上返回 0 到 buffer_size-1 之间的值,但是

  1. CIRC_SPACE*() 旨在在生产者中使用。 对于生产者,它们将返回一个下限,因为生产者控制头索引,但是消费者可能仍然在另一个 CPU 上耗尽缓冲区并移动尾索引。

    对于消费者,它将显示一个上限,因为生产者可能正忙于耗尽空间。

  2. CIRC_CNT*() 旨在在消费者中使用。 对于消费者,它们将返回一个下限,因为消费者控制尾索引,但是生产者可能仍然在另一个 CPU 上填充缓冲区并移动头索引。

    对于生产者,它将显示一个上限,因为消费者可能正忙于清空缓冲区。

  3. 对于第三方,生产者和消费者写入索引的顺序是不可保证的,因为它们是独立的并且可能在不同的 CPU 上进行 - 因此这种情况下的结果仅仅是一个猜测,甚至可能是负数。

将内存屏障与循环缓冲区一起使用

通过将内存屏障与循环缓冲区结合使用,可以避免以下需求

  1. 使用单个锁来管理对缓冲区两端的访问,从而允许同时填充和清空缓冲区;以及

  2. 使用原子计数器操作。

这有两个方面:填充缓冲区的生产者和清空缓冲区的消费者。 在任何给定时间,都应该只有一个东西在填充缓冲区,并且在任何给定时间,都应该只有一个东西在清空缓冲区,但是双方可以同时操作。

生产者

生产者将如下所示

spin_lock(&producer_lock);

unsigned long head = buffer->head;
/* The spin_unlock() and next spin_lock() provide needed ordering. */
unsigned long tail = READ_ONCE(buffer->tail);

if (CIRC_SPACE(head, tail, buffer->size) >= 1) {
        /* insert one item into the buffer */
        struct item *item = buffer[head];

        produce_item(item);

        smp_store_release(buffer->head,
                          (head + 1) & (buffer->size - 1));

        /* wake_up() will make sure that the head is committed before
         * waking anyone up */
        wake_up(consumer);
}

spin_unlock(&producer_lock);

这将指示 CPU 必须在新项的内容可供消费者使用之前写入,然后指示 CPU 必须在唤醒消费者之前写入修改后的头索引。

请注意,除非实际唤醒了某些东西,否则 wake_up() 不保证任何类型的屏障。 因此,我们不能依赖它进行排序。 但是,数组中始终有一个元素为空。 因此,生产者必须产生两个元素,然后才可能破坏消费者当前正在读取的元素。 因此,消费者连续调用之间的解锁-锁对提供了指示消费者已腾出给定元素的索引的读取与生产者对同一元素的写入之间必要的排序。

消费者

消费者将如下所示

spin_lock(&consumer_lock);

/* Read index before reading contents at that index. */
unsigned long head = smp_load_acquire(buffer->head);
unsigned long tail = buffer->tail;

if (CIRC_CNT(head, tail, buffer->size) >= 1) {

        /* extract one item from the buffer */
        struct item *item = buffer[tail];

        consume_item(item);

        /* Finish reading descriptor before incrementing tail. */
        smp_store_release(buffer->tail,
                          (tail + 1) & (buffer->size - 1));
}

spin_unlock(&consumer_lock);

这将指示 CPU 在读取新项目之前确保索引是最新的,然后确保 CPU 在写入将擦除该项的新尾指针之前已完成读取该项。

请注意使用 READ_ONCE() 和 smp_load_acquire() 来读取反对索引。 这可以防止编译器丢弃并重新加载其缓存的值。 如果您可以确定反对索引_仅_使用一次,则严格来说不需要这样做。 smp_load_acquire() 还会强制 CPU 对后续内存引用进行排序。 同样,smp_store_release() 在两种算法中都用于写入线程的索引。 这记录了我们正在写入可以并发读取的内容的事实,防止编译器撕裂存储,并强制执行针对先前访问的排序。

进一步阅读

另请参阅 Documentation/memory-barriers.txt 以获取有关 Linux 内存屏障设施的说明。