Skip to content

第一百八十三章:无锁队列

大乘期

涉及内核源码:

离开无锁链表的桥,林小源来到一条奔腾的河流前。河水清澈见底,无数光球在河水中顺流而下,整齐有序。

河流的上游站着一位壮硕的汉子,不断将光球投入河中;下游站着一位老妪,不断从河中捞起光球。两人配合默契,节奏丝毫不乱。

"这是无锁队列。"一个声音从身后传来。林小源回头,看到一位穿着工程师服饰的中年人。"我叫kfifo,是内核中的无锁环形缓冲区。"

"他们俩不需要锁吗?"林小源指着上游的汉子和下游的老妪。

"不需要。"kfifo摇头,"在最简单的情况下——一个生产者、一个消费者——只需要内存屏障就够了,连CAS都不需要。生产者只修改写指针,消费者只修改读指针,两者操作不同的变量,天然不会冲突。"

他说着,在河岸上画出两个刻度:headtail。"环形缓冲区不是无限河流,它是固定长度的环。head 指向生产者放入的位置,tail 指向消费者取出的位置;两者相等通常表示空,head 再前进一步追上 tail 前一格时表示满。为了少用除法,内核常让大小取 2 的幂,用按位与完成回绕。"

"那为什么总要留一个空位?"林小源问。

"因为满和空都可能让 head == tail,留一格能把两种状态分开。"kfifo说,"内核还提供 CIRC_SPACE()CIRC_CNT() 等宏,分别给生产者和消费者估算空间与占用。注意,是在它们各自视角下的下界或上界;第三方旁观者看到的只是猜测。"

林小源恍然大悟。这就是单生产者单消费者队列的妙处——约束带来了简化。

破关试炼

环流初试

环形缓冲区中,生产者推进哪个索引,消费者推进哪个索引?

答对后才能继续滑动和进入下一章。

"但如果是多个生产者呢?"林小源追问。

kfifo指了指河流远处的一段急流。"那就要用Michael-Scott队列了——经典的无锁队列算法。它用哨兵节点和两个指针:head和tail。入队时,用CAS把新节点挂到tail->next,再用CAS把tail移到新节点。出队时,用CAS把head移到head->next。"

"听起来和无锁链表很像?"

"原理相同,但设计更精巧。"kfifo从怀中取出一张图纸,上面画满了箭头和节点,"哨兵节点是关键——它简化了边界条件的处理。没有哨兵,head和tail可能同时指向NULL,CAS就会出错。"

"不过别把所有队列都叫无锁。"kfifo敲了敲图纸边缘,"内核文档说得很明白:要免掉同一把锁,就必须只有一个生产者、一个消费者。多个生产者可以用生产者锁把它们串行化,多个消费者也要用消费者锁串行化。约束不满足,就得补同步。"

"那内存屏障怎么用?"

"生产者先写入 item,再用 smp_store_release() 发布新的 head;消费者用 smp_load_acquire() 读取 head,确认看见的是已经写好的 item,取完后再用 release 更新 tail 防止编译器私自缓存对方索引。别指望 wake_up() 总能替你提供屏障,它只有真的唤醒了等待者才有相关保证。"

破关试炼

屏障之试

生产者发布新 head 前,必须先写好 item;发布动作通常使用哪类 release 存储?

答对后才能继续滑动和进入下一章。

"kfifo前辈,您在内核中主要用于什么场景?"林小源问道。

kfifo笑了。"中断处理、网络数据包、日志缓冲区——任何需要在中断上下文和进程上下文之间传递数据的场景。中断不能睡眠,不能用锁,但可以用无锁队列。"

"更准确地说,中断上下文不能使用会睡眠的锁;若双方正好是一进一出,环形缓冲区就能把锁降到边界外。生产者侧可以有自己的 producer_lock,消费者侧可以有自己的 consumer_lock,两端仍能同时推进,不必用一把大锁罩住整条河。"

他拍了拍林小源的肩膀。"记住,约束不是限制,是优化的机会。单生产者单消费者是一种约束,但正因为这个约束,我们才能去掉原子计数,只用 acquire/release 的因果符。这就是内核设计的智慧——在条件允许时,做最简单的事。"

破关试炼

约束之试

内核 circular buffer 文档要求免共享锁的基本并发形态是什么?

答对后才能继续滑动和进入下一章。
c
/*
 * 无锁队列:
 *
 * 入队 (enqueue):
 *   new->next = NULL
 *   CAS(&tail->next, NULL, new)
 *   CAS(&tail, tail, new)
 *
 * 出队 (dequeue):
 *   CAS(&head->next, head->next, head->next->next)
 *   返回 head->next
 *
 * 经典实现:
 *   Michael-Scott 队列
 *   使用哨兵节点
 *   head 和 tail 指针
 *
 * 内核实现:
 *   kfifo — 无锁环形缓冲区
 *   单生产者单消费者
 *   无需 CAS
 *
 * 使用场景:
 *   中断处理
 *   网络数据包
 *   日志缓冲区
 */

/* 无锁队列节点 */
struct queue_node {
    int data;
    _Atomic(struct queue_node *) next;
};

/* 无锁队列 */
struct lockfree_queue {
    _Atomic(struct queue_node *) head;
    _Atomic(struct queue_node *) tail;
};

void queue_init(struct lockfree_queue *q) {
    struct queue_node *sentinel = malloc(sizeof(struct queue_node));
    atomic_init(&sentinel->next, NULL);
    atomic_init(&q->head, sentinel);
    atomic_init(&q->tail, sentinel);
}

/* 入队 */
void queue_enqueue(struct lockfree_queue *q, int data) {
    struct queue_node *new = malloc(sizeof(struct queue_node));
    new->data = data;
    atomic_init(&new->next, NULL);

    struct queue_node *tail;
    struct queue_node *next;

    while (1) {
        tail = atomic_load(&q->tail);
        next = atomic_load(&tail->next);

        if (tail == atomic_load(&q->tail)) {
            if (next == NULL) {
                if (atomic_compare_exchange_weak(&tail->next, &next, new))
                    break;
            } else {
                atomic_compare_exchange_weak(&q->tail, &tail, next);
            }
        }
    }
    atomic_compare_exchange_weak(&q->tail, &tail, new);
}

/* 出队 */
int queue_dequeue(struct lockfree_queue *q, int *result) {
    struct queue_node *head;
    struct queue_node *tail;
    struct queue_node *next;

    while (1) {
        head = atomic_load(&q->head);
        tail = atomic_load(&q->tail);
        next = atomic_load(&head->next);

        if (head == atomic_load(&q->head)) {
            if (head == tail) {
                if (next == NULL)
                    return 0; /* 队列为空 */
                atomic_compare_exchange_weak(&q->tail, &tail, next);
            } else {
                *result = next->data;
                if (atomic_compare_exchange_weak(&q->head, &head, next))
                    break;
            }
        }
    }
    free(head);
    return 1;
}

printf("=== 无锁队列 — 生产者消费者的艺术 ===\n\n");

struct lockfree_queue q;
queue_init(&q);

printf("--- 入队 ---\n");
queue_enqueue(&q, 1);
printf("入队: 1\n");
queue_enqueue(&q, 2);
printf("入队: 2\n");
queue_enqueue(&q, 3);
printf("入队: 3\n");

printf("\n--- 出队 ---\n");
int val;
while (queue_dequeue(&q, &val)) {
    printf("出队: %d\n", val);
}

printf("\n--- Michael-Scott 队列 ---\n");
printf("特点:\n");
printf("  哨兵节点\n");
printf("  head 和 tail 指针\n");
printf("  CAS 更新\n\n");
printf("入队:\n");
printf("  new->next = NULL\n");
printf("  CAS(tail->next, NULL, new)\n");
printf("  CAS(tail, old, new)\n\n");
printf("出队:\n");
printf("  CAS(head, old, head->next)\n");
printf("  返回 old->data\n\n");

printf("--- kfifo ---\n");
printf("内核的无锁环形缓冲区:\n");
printf("  单生产者单消费者\n");
printf("  无需 CAS\n");
printf("  仅需内存屏障\n\n");
printf("使用:\n");
printf("  中断处理\n");
printf("  网络数据包\n");
printf("  日志缓冲区\n");

道藏笔记

内核启示

无锁队列是生产者-消费者的理想数据结构。

Michael-Scott 队列:

  • 哨兵节点
  • head 和 tail 指针
  • CAS 更新

kfifo:

  • 无锁环形缓冲区
  • 单生产者单消费者是基本前提
  • head/tail 分别由生产者和消费者推进
  • smp_store_release() / smp_load_acquire() 建立可见性顺序
  • 多生产者或多消费者要在各自一侧串行化

使用场景:

  • 中断处理
  • 网络数据包
  • 日志缓冲区

无锁队列是管道——数据在并发中流动。


破关试炼

无锁队列之试

无锁队列若要让生产者和消费者看到正确顺序,正文强调必须依靠什么?

答对后才能继续滑动和进入下一章。

以修仙之名,悟内核之道