第一百八十三章:无锁队列
大乘期涉及内核源码:
一
离开无锁链表的桥,林小源来到一条奔腾的河流前。河水清澈见底,无数光球在河水中顺流而下,整齐有序。
河流的上游站着一位壮硕的汉子,不断将光球投入河中;下游站着一位老妪,不断从河中捞起光球。两人配合默契,节奏丝毫不乱。
"这是无锁队列。"一个声音从身后传来。林小源回头,看到一位穿着工程师服饰的中年人。"我叫kfifo,是内核中的无锁环形缓冲区。"
"他们俩不需要锁吗?"林小源指着上游的汉子和下游的老妪。
"不需要。"kfifo摇头,"在最简单的情况下——一个生产者、一个消费者——只需要内存屏障就够了,连CAS都不需要。生产者只修改写指针,消费者只修改读指针,两者操作不同的变量,天然不会冲突。"
他说着,在河岸上画出两个刻度:head 与 tail。"环形缓冲区不是无限河流,它是固定长度的环。head 指向生产者放入的位置,tail 指向消费者取出的位置;两者相等通常表示空,head 再前进一步追上 tail 前一格时表示满。为了少用除法,内核常让大小取 2 的幂,用按位与完成回绕。"
"那为什么总要留一个空位?"林小源问。
"因为满和空都可能让 head == tail,留一格能把两种状态分开。"kfifo说,"内核还提供 CIRC_SPACE()、CIRC_CNT() 等宏,分别给生产者和消费者估算空间与占用。注意,是在它们各自视角下的下界或上界;第三方旁观者看到的只是猜测。"
林小源恍然大悟。这就是单生产者单消费者队列的妙处——约束带来了简化。
环流初试
环形缓冲区中,生产者推进哪个索引,消费者推进哪个索引?
二
"但如果是多个生产者呢?"林小源追问。
kfifo指了指河流远处的一段急流。"那就要用Michael-Scott队列了——经典的无锁队列算法。它用哨兵节点和两个指针:head和tail。入队时,用CAS把新节点挂到tail->next,再用CAS把tail移到新节点。出队时,用CAS把head移到head->next。"
"听起来和无锁链表很像?"
"原理相同,但设计更精巧。"kfifo从怀中取出一张图纸,上面画满了箭头和节点,"哨兵节点是关键——它简化了边界条件的处理。没有哨兵,head和tail可能同时指向NULL,CAS就会出错。"
"不过别把所有队列都叫无锁。"kfifo敲了敲图纸边缘,"内核文档说得很明白:要免掉同一把锁,就必须只有一个生产者、一个消费者。多个生产者可以用生产者锁把它们串行化,多个消费者也要用消费者锁串行化。约束不满足,就得补同步。"
"那内存屏障怎么用?"
"生产者先写入 item,再用 smp_store_release() 发布新的 head;消费者用 smp_load_acquire() 读取 head,确认看见的是已经写好的 item,取完后再用 release 更新 tail。 防止编译器私自缓存对方索引。别指望 wake_up() 总能替你提供屏障,它只有真的唤醒了等待者才有相关保证。"
屏障之试
生产者发布新 head 前,必须先写好 item;发布动作通常使用哪类 release 存储?
三
"kfifo前辈,您在内核中主要用于什么场景?"林小源问道。
kfifo笑了。"中断处理、网络数据包、日志缓冲区——任何需要在中断上下文和进程上下文之间传递数据的场景。中断不能睡眠,不能用锁,但可以用无锁队列。"
"更准确地说,中断上下文不能使用会睡眠的锁;若双方正好是一进一出,环形缓冲区就能把锁降到边界外。生产者侧可以有自己的 producer_lock,消费者侧可以有自己的 consumer_lock,两端仍能同时推进,不必用一把大锁罩住整条河。"
他拍了拍林小源的肩膀。"记住,约束不是限制,是优化的机会。单生产者单消费者是一种约束,但正因为这个约束,我们才能去掉原子计数,只用 acquire/release 的因果符。这就是内核设计的智慧——在条件允许时,做最简单的事。"
约束之试
内核 circular buffer 文档要求免共享锁的基本并发形态是什么?
/*
* 无锁队列:
*
* 入队 (enqueue):
* new->next = NULL
* CAS(&tail->next, NULL, new)
* CAS(&tail, tail, new)
*
* 出队 (dequeue):
* CAS(&head->next, head->next, head->next->next)
* 返回 head->next
*
* 经典实现:
* Michael-Scott 队列
* 使用哨兵节点
* head 和 tail 指针
*
* 内核实现:
* kfifo — 无锁环形缓冲区
* 单生产者单消费者
* 无需 CAS
*
* 使用场景:
* 中断处理
* 网络数据包
* 日志缓冲区
*/
/* 无锁队列节点 */
struct queue_node {
int data;
_Atomic(struct queue_node *) next;
};
/* 无锁队列 */
struct lockfree_queue {
_Atomic(struct queue_node *) head;
_Atomic(struct queue_node *) tail;
};
void queue_init(struct lockfree_queue *q) {
struct queue_node *sentinel = malloc(sizeof(struct queue_node));
atomic_init(&sentinel->next, NULL);
atomic_init(&q->head, sentinel);
atomic_init(&q->tail, sentinel);
}
/* 入队 */
void queue_enqueue(struct lockfree_queue *q, int data) {
struct queue_node *new = malloc(sizeof(struct queue_node));
new->data = data;
atomic_init(&new->next, NULL);
struct queue_node *tail;
struct queue_node *next;
while (1) {
tail = atomic_load(&q->tail);
next = atomic_load(&tail->next);
if (tail == atomic_load(&q->tail)) {
if (next == NULL) {
if (atomic_compare_exchange_weak(&tail->next, &next, new))
break;
} else {
atomic_compare_exchange_weak(&q->tail, &tail, next);
}
}
}
atomic_compare_exchange_weak(&q->tail, &tail, new);
}
/* 出队 */
int queue_dequeue(struct lockfree_queue *q, int *result) {
struct queue_node *head;
struct queue_node *tail;
struct queue_node *next;
while (1) {
head = atomic_load(&q->head);
tail = atomic_load(&q->tail);
next = atomic_load(&head->next);
if (head == atomic_load(&q->head)) {
if (head == tail) {
if (next == NULL)
return 0; /* 队列为空 */
atomic_compare_exchange_weak(&q->tail, &tail, next);
} else {
*result = next->data;
if (atomic_compare_exchange_weak(&q->head, &head, next))
break;
}
}
}
free(head);
return 1;
}
printf("=== 无锁队列 — 生产者消费者的艺术 ===\n\n");
struct lockfree_queue q;
queue_init(&q);
printf("--- 入队 ---\n");
queue_enqueue(&q, 1);
printf("入队: 1\n");
queue_enqueue(&q, 2);
printf("入队: 2\n");
queue_enqueue(&q, 3);
printf("入队: 3\n");
printf("\n--- 出队 ---\n");
int val;
while (queue_dequeue(&q, &val)) {
printf("出队: %d\n", val);
}
printf("\n--- Michael-Scott 队列 ---\n");
printf("特点:\n");
printf(" 哨兵节点\n");
printf(" head 和 tail 指针\n");
printf(" CAS 更新\n\n");
printf("入队:\n");
printf(" new->next = NULL\n");
printf(" CAS(tail->next, NULL, new)\n");
printf(" CAS(tail, old, new)\n\n");
printf("出队:\n");
printf(" CAS(head, old, head->next)\n");
printf(" 返回 old->data\n\n");
printf("--- kfifo ---\n");
printf("内核的无锁环形缓冲区:\n");
printf(" 单生产者单消费者\n");
printf(" 无需 CAS\n");
printf(" 仅需内存屏障\n\n");
printf("使用:\n");
printf(" 中断处理\n");
printf(" 网络数据包\n");
printf(" 日志缓冲区\n");#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
/*
* 无锁队列:
*
* 入队 (enqueue):
* new->next = NULL
* CAS(&tail->next, NULL, new)
* CAS(&tail, tail, new)
*
* 出队 (dequeue):
* CAS(&head->next, head->next, head->next->next)
* 返回 head->next
*
* 经典实现:
* Michael-Scott 队列
* 使用哨兵节点
* head 和 tail 指针
*
* 内核实现:
* kfifo — 无锁环形缓冲区
* 单生产者单消费者
* 无需 CAS
*
* 使用场景:
* 中断处理
* 网络数据包
* 日志缓冲区
*/
/* 无锁队列节点 */
struct queue_node {
int data;
_Atomic(struct queue_node *) next;
};
/* 无锁队列 */
struct lockfree_queue {
_Atomic(struct queue_node *) head;
_Atomic(struct queue_node *) tail;
};
void queue_init(struct lockfree_queue *q) {
struct queue_node *sentinel = malloc(sizeof(struct queue_node));
atomic_init(&sentinel->next, NULL);
atomic_init(&q->head, sentinel);
atomic_init(&q->tail, sentinel);
}
/* 入队 */
void queue_enqueue(struct lockfree_queue *q, int data) {
struct queue_node *new = malloc(sizeof(struct queue_node));
new->data = data;
atomic_init(&new->next, NULL);
struct queue_node *tail;
struct queue_node *next;
while (1) {
tail = atomic_load(&q->tail);
next = atomic_load(&tail->next);
if (tail == atomic_load(&q->tail)) {
if (next == NULL) {
if (atomic_compare_exchange_weak(&tail->next, &next, new))
break;
} else {
atomic_compare_exchange_weak(&q->tail, &tail, next);
}
}
}
atomic_compare_exchange_weak(&q->tail, &tail, new);
}
/* 出队 */
int queue_dequeue(struct lockfree_queue *q, int *result) {
struct queue_node *head;
struct queue_node *tail;
struct queue_node *next;
while (1) {
head = atomic_load(&q->head);
tail = atomic_load(&q->tail);
next = atomic_load(&head->next);
if (head == atomic_load(&q->head)) {
if (head == tail) {
if (next == NULL)
return 0; /* 队列为空 */
atomic_compare_exchange_weak(&q->tail, &tail, next);
} else {
*result = next->data;
if (atomic_compare_exchange_weak(&q->head, &head, next))
break;
}
}
}
free(head);
return 1;
}
int main() {
printf("=== 无锁队列 — 生产者消费者的艺术 ===\n\n");
struct lockfree_queue q;
queue_init(&q);
printf("--- 入队 ---\n");
queue_enqueue(&q, 1);
printf("入队: 1\n");
queue_enqueue(&q, 2);
printf("入队: 2\n");
queue_enqueue(&q, 3);
printf("入队: 3\n");
printf("\n--- 出队 ---\n");
int val;
while (queue_dequeue(&q, &val)) {
printf("出队: %d\n", val);
}
printf("\n--- Michael-Scott 队列 ---\n");
printf("特点:\n");
printf(" 哨兵节点\n");
printf(" head 和 tail 指针\n");
printf(" CAS 更新\n\n");
printf("入队:\n");
printf(" new->next = NULL\n");
printf(" CAS(tail->next, NULL, new)\n");
printf(" CAS(tail, old, new)\n\n");
printf("出队:\n");
printf(" CAS(head, old, head->next)\n");
printf(" 返回 old->data\n\n");
printf("--- kfifo ---\n");
printf("内核的无锁环形缓冲区:\n");
printf(" 单生产者单消费者\n");
printf(" 无需 CAS\n");
printf(" 仅需内存屏障\n\n");
printf("使用:\n");
printf(" 中断处理\n");
printf(" 网络数据包\n");
printf(" 日志缓冲区\n");
return 0;
}道藏笔记
内核启示
无锁队列是生产者-消费者的理想数据结构。
Michael-Scott 队列:
- 哨兵节点
- head 和 tail 指针
- CAS 更新
kfifo:
- 无锁环形缓冲区
- 单生产者单消费者是基本前提
head/tail分别由生产者和消费者推进smp_store_release()/smp_load_acquire()建立可见性顺序- 多生产者或多消费者要在各自一侧串行化
使用场景:
- 中断处理
- 网络数据包
- 日志缓冲区
无锁队列是管道——数据在并发中流动。
无锁队列之试
无锁队列若要让生产者和消费者看到正确顺序,正文强调必须依靠什么?