第一百八十一章:原子操作
大乘期涉及内核源码:
一
沿着山脊继续前行,林小源来到一座水晶矿洞。矿洞的墙壁上嵌满了拳头大小的水晶,每颗水晶内部都封存着一个数字,数字在水晶中缓缓旋转,散发着幽蓝的光芒。
一位矿工打扮的老者正在用锤子敲击一块水晶。每敲一下,水晶中的数字就变化一次——但变化的过程快得几乎看不清,仿佛一瞬间就完成了。
"前辈,这是什么?"
老者停下锤子,擦了擦额头的汗。"原子操作。Atomic。我敲的每一下,都是不可分割的——要么完成,要么没开始,没有中间状态。"
他指着水晶中的数字。"在多核系统中,两个CPU可能同时修改同一个变量。普通操作会被中断——你读到旧值,我也读到旧值,我们都加一,最后只加了一次。但原子操作用LOCK前缀锁定缓存行,确保整个操作一气呵成。"
林小源凑近看,发现水晶中刻着一行小字:atomic_t。"这就是内核中的原子整数?"
"没错。atomic_t、atomic64_t、atomic_long_t——都是原子类型。atomic_read()读取,atomic_set()设置,atomic_add()加,atomic_sub()减,atomic_inc()自增,atomic_dec()自减……"老者如数家珍。
"不过,如果你只用 atomic_read() 和 atomic_set(),那多半用错了。"老者把一块水晶翻过来,背面刻着 与 。"官方文档说得很直接:非 RMW 的 atomic 操作通常只是普通 load/store 加标记;只需要读写一次,就不必把变量伪装成 atomic_t。"
"RMW 是什么?"
"Read-Modify-Write,读、改、写合成一个不可分割的操作。atomic 真正擅长的是这个。"
原子初试
atomic_t 真正擅长的是把读、改、写合成哪类操作?
二
"前辈,我听说CAS很重要?"林小源问道。
老者放下锤子,从怀中取出一枚双面铜钱。"CAS——Compare-And-Swap,比较交换。这是无锁数据结构的钥匙。"
他将铜钱抛向空中。"你先读到当前值是A,然后你希望把它改成B。CAS会检查:当前值还是A吗?如果是,就改成B,返回成功;如果不是——说明别的CPU已经改过了——返回失败,你得重新读取,再试一次。"
"这就是atomic_cmpxchg()?"
"没错。"老者接住铜钱,"CAS的精髓在于——它把'读取-比较-写入'这三步合成了一步。在LOCK前缀的保护下,这三步是不可分割的。这就是为什么它是无锁算法的基础——你不需要锁,只需要不断地CAS重试。"
"但原子不等于有序。"老者把铜钱按在桌上,"atomic 文档有一条经验法则:非 RMW 无序;不返回值的 RMW 也无序;返回值的 RMW 通常 fully ordered;条件 RMW 失败时无序,成功时才按对应规则来。若要明确更弱或更强的顺序,就用 _relaxed、_acquire、_release,或者在 RMW 前后配 smp_mb__before_atomic()、smp_mb__after_atomic()。"
"所以 atomic_cmpxchg 成功和失败,顺序也不同?"
"对。失败只说明没改成,不代表帮你建立了想要的因果。"
有序之试
atomic 条件 RMW 操作在失败时,内存顺序通常是有序还是无序?
三
"但LOCK前缀不会影响性能吗?"林小源追问。
老者叹了口气。"会。LOCK前缀会锁定缓存行,如果缓存行被其他CPU持有,就要通过缓存一致性协议去获取独占权。这需要时间。所以原子操作虽然不需要锁,但也不是零开销。"
他指了指矿洞深处。"在ARM架构上,实现方式不同——用的是LL/SC,Load-Linked/Store-Conditional。先读取并标记,写入时检查标记是否还在。如果被其他CPU修改了,标记就消失了,写入失败,重试。"
"所以不同架构的原子操作实现不同,但接口相同?"
"正是。"老者微微一笑,"这就是内核的抽象之美——你不需要知道底层用的是LOCK CMPXCHG还是LL/SC,你只需要调用atomic_cmpxchg()。"
"还有两条警戒线。"老者把矿洞深处的两盏灯点亮,"第一,atomic_t 不是给 MMIO 用的,对设备寄存器做 atomic 操作在某些平台会直接出大事。第二,引用计数别再随手用 atomic_t,优先用 refcount_t。"
"为什么?"
"引用计数关乎对象生命周期,溢出、从零复活、释放顺序都可能变成漏洞。refcount_t API 更小,很多 decrement 操作带 release 语义,refcount_dec_and_test() 在成功归零时还提供 acquire 语义,目的就是让最后一个引用释放对象时顺序更可靠。"
林小源握住水晶,终于意识到:原子操作保证一个数不会被撕裂,但不自动保证对象活着,也不自动保证别的内存已经可见。
引用之试
内核中实现对象引用计数时,官方文档建议优先使用 atomic_t 还是 refcount_t?
/*
* 原子操作:
*
* 为什么需要:
* 多核系统中,普通操作可能被中断
* 两个 CPU 同时修改同一变量
* 导致数据不一致
*
* 原子类型:
* atomic_t — 原子整数
* atomic64_t — 原子 64 位整数
* atomic_long_t — 原子长整数
*
* 原子操作:
* atomic_read(v) — 读取
* atomic_set(v, i) — 设置
* atomic_add(i, v) — 加
* atomic_sub(i, v) — 减
* atomic_inc(v) — 自增
* atomic_dec(v) — 自减
* atomic_cmpxchg(v, old, new) — 比较交换
* atomic_xchg(v, new) — 交换
*
* 位操作:
* set_bit(nr, addr) — 设置位
* clear_bit(nr, addr) — 清除位
* change_bit(nr, addr) — 改变位
* test_and_set_bit(nr, addr) — 测试并设置
*/
/* 模拟原子操作 */
typedef struct {
volatile int counter;
} atomic_t;
int atomic_read(atomic_t *v) {
return v->counter;
}
void atomic_set(atomic_t *v, int i) {
v->counter = i;
}
/* 模拟原子加 (实际用 LOCK XADD) */
void atomic_add(int i, atomic_t *v) {
/* 在多核系统中,这需要 LOCK 前缀 */
v->counter += i;
}
void atomic_sub(int i, atomic_t *v) {
v->counter -= i;
}
void atomic_inc(atomic_t *v) {
v->counter++;
}
void atomic_dec(atomic_t *v) {
v->counter--;
}
/* 模拟比较交换 (实际用 LOCK CMPXCHG) */
int atomic_cmpxchg(atomic_t *v, int old, int new) {
int prev = v->counter;
if (prev == old)
v->counter = new;
return prev;
}
printf("=== 原子操作 — 不可分割的操作 ===\n\n");
atomic_t counter;
atomic_set(&counter, 0);
printf("--- 基本操作 ---\n");
printf("初始值: %d\n", atomic_read(&counter));
atomic_inc(&counter);
printf("inc: %d\n", atomic_read(&counter));
atomic_add(5, &counter);
printf("add 5: %d\n", atomic_read(&counter));
atomic_sub(2, &counter);
printf("sub 2: %d\n", atomic_read(&counter));
atomic_dec(&counter);
printf("dec: %d\n", atomic_read(&counter));
printf("\n--- 比较交换 (CAS) ---\n");
atomic_set(&counter, 10);
printf("当前值: %d\n", atomic_read(&counter));
int old = atomic_cmpxchg(&counter, 10, 20);
printf("CAS(10, 20): old=%d, new=%d\n", old, atomic_read(&counter));
old = atomic_cmpxchg(&counter, 10, 30);
printf("CAS(10, 30): old=%d, new=%d (失败)\n", old, atomic_read(&counter));
printf("\n--- 原子操作 vs 锁 ---\n");
printf("原子操作:\n");
printf(" 不可分割\n");
printf(" 无锁\n");
printf(" 适合简单操作\n\n");
printf("锁:\n");
printf(" 保护临界区\n");
printf(" 适合复杂操作\n\n");
printf("--- 实现原理 ---\n");
printf("x86:\n");
printf(" LOCK 前缀\n");
printf(" 锁定缓存行\n");
printf(" 或锁定总线\n\n");
printf("ARM:\n");
printf(" LDX/STX\n");
printf(" Load-Linked/Store-Conditional\n\n");
printf("--- 使用场景 ---\n");
printf("1. 引用计数\n");
printf(" atomic_inc/dec\n\n");
printf("2. 统计计数器\n");
printf(" atomic_add\n\n");
printf("3. 标志位\n");
printf(" test_and_set_bit\n\n");
printf("4. 无锁数据结构\n");
printf(" atomic_cmpxchg\n");#include <stdio.h>
/*
* 原子操作:
*
* 为什么需要:
* 多核系统中,普通操作可能被中断
* 两个 CPU 同时修改同一变量
* 导致数据不一致
*
* 原子类型:
* atomic_t — 原子整数
* atomic64_t — 原子 64 位整数
* atomic_long_t — 原子长整数
*
* 原子操作:
* atomic_read(v) — 读取
* atomic_set(v, i) — 设置
* atomic_add(i, v) — 加
* atomic_sub(i, v) — 减
* atomic_inc(v) — 自增
* atomic_dec(v) — 自减
* atomic_cmpxchg(v, old, new) — 比较交换
* atomic_xchg(v, new) — 交换
*
* 位操作:
* set_bit(nr, addr) — 设置位
* clear_bit(nr, addr) — 清除位
* change_bit(nr, addr) — 改变位
* test_and_set_bit(nr, addr) — 测试并设置
*/
/* 模拟原子操作 */
typedef struct {
volatile int counter;
} atomic_t;
int atomic_read(atomic_t *v) {
return v->counter;
}
void atomic_set(atomic_t *v, int i) {
v->counter = i;
}
/* 模拟原子加 (实际用 LOCK XADD) */
void atomic_add(int i, atomic_t *v) {
/* 在多核系统中,这需要 LOCK 前缀 */
v->counter += i;
}
void atomic_sub(int i, atomic_t *v) {
v->counter -= i;
}
void atomic_inc(atomic_t *v) {
v->counter++;
}
void atomic_dec(atomic_t *v) {
v->counter--;
}
/* 模拟比较交换 (实际用 LOCK CMPXCHG) */
int atomic_cmpxchg(atomic_t *v, int old, int new) {
int prev = v->counter;
if (prev == old)
v->counter = new;
return prev;
}
int main() {
printf("=== 原子操作 — 不可分割的操作 ===\n\n");
atomic_t counter;
atomic_set(&counter, 0);
printf("--- 基本操作 ---\n");
printf("初始值: %d\n", atomic_read(&counter));
atomic_inc(&counter);
printf("inc: %d\n", atomic_read(&counter));
atomic_add(5, &counter);
printf("add 5: %d\n", atomic_read(&counter));
atomic_sub(2, &counter);
printf("sub 2: %d\n", atomic_read(&counter));
atomic_dec(&counter);
printf("dec: %d\n", atomic_read(&counter));
printf("\n--- 比较交换 (CAS) ---\n");
atomic_set(&counter, 10);
printf("当前值: %d\n", atomic_read(&counter));
int old = atomic_cmpxchg(&counter, 10, 20);
printf("CAS(10, 20): old=%d, new=%d\n", old, atomic_read(&counter));
old = atomic_cmpxchg(&counter, 10, 30);
printf("CAS(10, 30): old=%d, new=%d (失败)\n", old, atomic_read(&counter));
printf("\n--- 原子操作 vs 锁 ---\n");
printf("原子操作:\n");
printf(" 不可分割\n");
printf(" 无锁\n");
printf(" 适合简单操作\n\n");
printf("锁:\n");
printf(" 保护临界区\n");
printf(" 适合复杂操作\n\n");
printf("--- 实现原理 ---\n");
printf("x86:\n");
printf(" LOCK 前缀\n");
printf(" 锁定缓存行\n");
printf(" 或锁定总线\n\n");
printf("ARM:\n");
printf(" LDX/STX\n");
printf(" Load-Linked/Store-Conditional\n\n");
printf("--- 使用场景 ---\n");
printf("1. 引用计数\n");
printf(" atomic_inc/dec\n\n");
printf("2. 统计计数器\n");
printf(" atomic_add\n\n");
printf("3. 标志位\n");
printf(" test_and_set_bit\n\n");
printf("4. 无锁数据结构\n");
printf(" atomic_cmpxchg\n");
return 0;
}道藏笔记
内核启示
多核系统里两个 CPU 同时改同一个变量,普通操作可能只加了一次。原子操作用 LOCK 前缀锁定缓存行,整个操作一气呵成——要么完成要么没开始,没有中间状态。atomic_t 是原子整数,atomic_read/set/add/sub/inc/dec 一套齐全。
CAS(atomic_cmpxchg)是无锁数据结构的钥匙:检查当前值是不是期望值,是就更新,不是就返回失败让你重试。x86 用 LOCK CMPXCHG 实现,ARM 用 LL/SC(Load-Linked/Store-Conditional)。接口相同,底层不同,这就是内核的抽象之美。LOCK 前缀有性能开销(锁定缓存行),所以原子操作不是零开销,但比锁轻量得多。
但 atomic 不自动等于有序:不返回值的 RMW 可能是无序的,条件操作失败也通常无序,需要时要显式使用 acquire/release/relaxed 后缀或 smp_mb__before/after_atomic()。atomic 也不支持 MMIO。对象引用计数优先用 refcount_t,因为生命周期不是简单整数加减。
原子操作是基石——不可分割,保证一致。
原子操作之试
x86 原子操作为了让多核看到不可分割的修改,常使用哪个指令前缀?