第一百八十二章:无锁链表
大乘期涉及内核源码:
一
离开水晶矿洞,林小源来到一座悬浮在虚空中的桥。桥身由无数发光的节点串联而成,每个节点都与其他节点用光线相连,构成一条闪烁的链表。
桥头站着一位年轻的女修士,身穿紫色长裙,手中把玩着一颗光球。
"欢迎来到无锁链表。"女修士微微一笑,"我叫阿紫,这里的守护者。"
"无锁链表?"林小源踏上桥面,感受到脚下节点传来的轻微震动。
"没错。普通的链表需要锁来保护并发访问,但这里的链表用CAS操作实现插入和删除,不需要任何锁。"阿紫将光球抛向空中,光球化作一个新的节点,精准地插入到链表中。
"看——插入的过程只有两步:第一步,把新节点的next指向当前的head->next;第二步,用CAS把head->next从旧值换成新节点。如果CAS失败了——说明别的CPU刚刚修改了head->next——那就重新读取,重试。"
林小源看到桥面上的节点不断有新的光球插入,每一个插入都是原子的,没有任何锁的影子。
二
"但有一个陷阱。"阿紫的表情变得严肃,"ABA问题。"
她从桥面上取下三颗光球,排成一排:A、B、A。
"假设你读到了A,正准备用CAS把A换成C。但在你CAS之前,另一个线程把A删了,插入了B,又插入了A。你再CAS——成功了!因为值还是A。但链表已经变了——B被你跳过了。"
林小源倒吸一口凉气。"这怎么解决?"
"两种方法。第一种,带版本号的CAS——每次修改,版本号加一。CAS时不仅比较值,还比较版本号。第二种,结合RCU——用RCU保护内存回收,确保被删除的节点不会被重用。"
三
"无锁链表的优势是什么?"林小源问道。
阿紫指了指桥面上川流不息的光球。"无锁、无死锁、并发性能好。多个CPU可以同时插入,互不阻塞。但代价是——复杂性高,调试困难,ABA问题如影随形。"
她顿了顿,补充道:"在内核中,纯无锁链表很少单独使用。更常见的做法是RCU加普通链表——读取侧用RCU保护,写入侧用锁保护。这样既简单又高效。"
林小源点头。无锁是一种理想,但现实中往往需要在复杂性和性能之间找到平衡。
/*
* 无锁链表:
*
* 插入:
* new->next = head->next
* CAS(&head->next, new->next, new)
* 如果失败,重试
*
* 删除:
* 找到前驱节点
* CAS(&prev->next, curr, curr->next)
* 如果失败,重试
*
* 优势:
* 无锁
* 无死锁
* 并发性能好
*
* 挑战:
* 内存回收 (ABA 问题)
* 复杂性高
* 调试困难
*
* ABA 问题:
* 读取 A
* 其他线程删除 A,添加 B,再添加 A
* CAS 成功,但链表已变化
*
* 解决方案:
* 带版本号的 CAS
* RCU + 无锁链表
*/
/* 无锁链表节点 */
struct lockfree_node {
int data;
_Atomic(struct lockfree_node *) next;
};
/* 初始化 */
void node_init(struct lockfree_node *node, int data) {
node->data = data;
atomic_init(&node->next, NULL);
}
/* 无锁插入 */
void lockfree_insert(struct lockfree_node *head, int data) {
struct lockfree_node *new = malloc(sizeof(struct lockfree_node));
node_init(new, data);
struct lockfree_node *old_next;
do {
old_next = atomic_load(&head->next);
atomic_store(&new->next, old_next);
} while (!atomic_compare_exchange_weak(&head->next, &old_next, new));
}
/* 无锁遍历 */
void lockfree_traverse(struct lockfree_node *head) {
struct lockfree_node *curr = atomic_load(&head->next);
while (curr) {
printf(" %d", curr->data);
curr = atomic_load(&curr->next);
}
printf("\n");
}
/* 释放链表 */
void lockfree_free(struct lockfree_node *head) {
struct lockfree_node *curr = atomic_load(&head->next);
while (curr) {
struct lockfree_node *next = atomic_load(&curr->next);
free(curr);
curr = next;
}
}
printf("=== 无锁链表 — 无锁的艺术 ===\n\n");
struct lockfree_node head;
node_init(&head, 0);
printf("--- 插入 ---\n");
lockfree_insert(&head, 3);
lockfree_insert(&head, 2);
lockfree_insert(&head, 1);
printf("链表: ");
lockfree_traverse(&head);
printf("\n--- 无锁插入原理 ---\n");
printf("new->next = head->next\n");
printf("CAS(&head->next, old, new)\n\n");
printf("如果 CAS 失败:\n");
printf(" 其他 CPU 修改了 head->next\n");
printf(" 重新读取,重试\n\n");
printf("--- ABA 问题 ---\n");
printf("1. 线程 1 读取 A\n");
printf("2. 线程 2 删除 A\n");
printf("3. 线程 2 添加 B\n");
printf("4. 线程 2 添加 A\n");
printf("5. 线程 1 CAS 成功\n");
printf(" 但链表已变化!\n\n");
printf("--- 解决方案 ---\n");
printf("1. 带版本号的 CAS:\n");
printf(" 每次修改版本号+1\n");
printf(" CAS 比较值和版本号\n\n");
printf("2. RCU + 无锁:\n");
printf(" RCU 保护内存回收\n");
printf(" 无锁保证并发\n\n");
printf("--- 优势 vs 挑战 ---\n");
printf("优势:\n");
printf(" 无锁\n");
printf(" 无死锁\n");
printf(" 并发性能好\n\n");
printf("挑战:\n");
printf(" ABA 问题\n");
printf(" 复杂性高\n");
printf(" 调试困难\n");
lockfree_free(&head);#include <stdio.h>
#include <stdlib.h>
#include <stdatomic.h>
/*
* 无锁链表:
*
* 插入:
* new->next = head->next
* CAS(&head->next, new->next, new)
* 如果失败,重试
*
* 删除:
* 找到前驱节点
* CAS(&prev->next, curr, curr->next)
* 如果失败,重试
*
* 优势:
* 无锁
* 无死锁
* 并发性能好
*
* 挑战:
* 内存回收 (ABA 问题)
* 复杂性高
* 调试困难
*
* ABA 问题:
* 读取 A
* 其他线程删除 A,添加 B,再添加 A
* CAS 成功,但链表已变化
*
* 解决方案:
* 带版本号的 CAS
* RCU + 无锁链表
*/
/* 无锁链表节点 */
struct lockfree_node {
int data;
_Atomic(struct lockfree_node *) next;
};
/* 初始化 */
void node_init(struct lockfree_node *node, int data) {
node->data = data;
atomic_init(&node->next, NULL);
}
/* 无锁插入 */
void lockfree_insert(struct lockfree_node *head, int data) {
struct lockfree_node *new = malloc(sizeof(struct lockfree_node));
node_init(new, data);
struct lockfree_node *old_next;
do {
old_next = atomic_load(&head->next);
atomic_store(&new->next, old_next);
} while (!atomic_compare_exchange_weak(&head->next, &old_next, new));
}
/* 无锁遍历 */
void lockfree_traverse(struct lockfree_node *head) {
struct lockfree_node *curr = atomic_load(&head->next);
while (curr) {
printf(" %d", curr->data);
curr = atomic_load(&curr->next);
}
printf("\n");
}
/* 释放链表 */
void lockfree_free(struct lockfree_node *head) {
struct lockfree_node *curr = atomic_load(&head->next);
while (curr) {
struct lockfree_node *next = atomic_load(&curr->next);
free(curr);
curr = next;
}
}
int main() {
printf("=== 无锁链表 — 无锁的艺术 ===\n\n");
struct lockfree_node head;
node_init(&head, 0);
printf("--- 插入 ---\n");
lockfree_insert(&head, 3);
lockfree_insert(&head, 2);
lockfree_insert(&head, 1);
printf("链表: ");
lockfree_traverse(&head);
printf("\n--- 无锁插入原理 ---\n");
printf("new->next = head->next\n");
printf("CAS(&head->next, old, new)\n\n");
printf("如果 CAS 失败:\n");
printf(" 其他 CPU 修改了 head->next\n");
printf(" 重新读取,重试\n\n");
printf("--- ABA 问题 ---\n");
printf("1. 线程 1 读取 A\n");
printf("2. 线程 2 删除 A\n");
printf("3. 线程 2 添加 B\n");
printf("4. 线程 2 添加 A\n");
printf("5. 线程 1 CAS 成功\n");
printf(" 但链表已变化!\n\n");
printf("--- 解决方案 ---\n");
printf("1. 带版本号的 CAS:\n");
printf(" 每次修改版本号+1\n");
printf(" CAS 比较值和版本号\n\n");
printf("2. RCU + 无锁:\n");
printf(" RCU 保护内存回收\n");
printf(" 无锁保证并发\n\n");
printf("--- 优势 vs 挑战 ---\n");
printf("优势:\n");
printf(" 无锁\n");
printf(" 无死锁\n");
printf(" 并发性能好\n\n");
printf("挑战:\n");
printf(" ABA 问题\n");
printf(" 复杂性高\n");
printf(" 调试困难\n");
lockfree_free(&head);
return 0;
}道藏笔记
内核启示
无锁链表的插入就两步:new->next = head->next,然后 CAS 把 head->next 从旧值换成新节点。失败了就重新读取重试,不需要任何锁。多个 CPU 可以同时插入,互不阻塞。
但有个坑叫 ABA 问题:你读到 A 准备 CAS,中间别人删了 A 加了 B 又加了 A,你 CAS 成功了但链表已经变了。解决办法是带版本号的 CAS(每次修改版本号加一),或者结合 RCU 保护内存回收。实际上内核里纯无锁链表很少单独用,更常见的是 RCU 加普通链表——读取侧 RCU 保护,写入侧用锁,简单又高效。
无锁链表是艺术——无锁并发,但需谨慎。
无锁链表之试
无锁链表让读者遍历时不加锁,主要依靠哪套读复制更新机制?