摘要訊息 : C++ 併發程式設計下拋棄互斥鎖?

0. 前言

併發程式設計中, 要避免多個執行緒在某個資源上競爭, 使用鎖是最直接也是最廣泛的解決方式. 然而在使用不當的情況下, 鎖會造成程式陷入死鎖狀態. 我們可能經常看到某些程式無響應或者作業系統卡住無法進行任何操作, 有時它們可能是陷入了死鎖狀態, 導致我們不得不強制關閉. 另一方面, 在極高併發和極高可擴展性的要求下, 鎖也會成為這些場景的效能瓶頸. 無鎖程式設計根本不需要借助鎖, 也自然就不存在死鎖這樣的問題, 看起來非常具有吸引力. 然而, 無鎖程式設計對一位 C++ 程式設計師的要求非常高, 稍有不慎便可能陷入未定行為或者記憶體流失等陷阱.

從個人的角度來說, 無鎖程式設計非常難. 閣下在閱讀本文後半部分時可能看到, 某些結論僅僅是一段話甚至一句話, 但這個結論可能是我花費幾個星期甚至兩三個月的空餘時間去逐步推導論證出來的. 本篇文章建議從頭至尾逐句閱讀, 不建議跳躍式閱讀.

1. 記憶體排序

C++ 11 引入的記憶體排序 (memory order) 並不是絕大多數 C++ 程式設計師所關注的, 儘管原本許多未定行為在有了記憶體排序後行為被明確, 但似乎對許多 C++ 程式設計師來說根本無需關心. 一般情況下, 使用 C++ 標準樣板程式庫預設的記憶體排序便足夠應付幾乎全部場景. 然而當我們嘗試接近底層, 記憶體排序是多執行緒程式效能優化中一個繞不開的優化點. 值得一提的是, 由於 C++ 引入記憶體排序的時間相對其它語言較晚, 所以 C++ 的記憶體排序模型是比較完整的. 雖然仍然存在一些記憶體排序模型還未被引入, 但是這些記憶體排序模型基本上都處於學術研究階段.

1.1 重排

對於若干個變數, CPU 會根據具體的程式碼對這些變數有存取或者寫入的操作, 這些操作會形成一個操作序列, 我們稱這個序列為這些變數的記憶體排序.

int a {}, b {};
void f() {
    a = b + 1;
    b = 1;
}

Code 1-1 中, CPU 首先從記憶體中存取變數 b 的值, 對這個值加一運算後寫入 a 對應的記憶體中. 最後, 向 b 對應的記憶體中寫入值 1. 因此, Code 1-1 的記憶體排序看起來應該是 \displaystyle {\mathop {\mathrm {load}}(a) \rightarrow \mathop {\mathrm {store}}(a) \rightarrow \mathop {\mathrm {store}}(b)}. 然而事實上, 若編碼器介入對 Code 1-1 進行最佳化, 記憶體排序是會改變的, 它可能和 Code 1-1 中看起來的順序完全不同, 甚至有時候有些反直觀. 在 x86-64 GCC 15.1 下, 若編碼器的最佳化等級為 -O2, 則 Code 1-1 對應的組合語言應該是

mov eax, dword ptr[rip + b]     ; 將 b 的值從記憶體中取出, 並存入暫存器 eax
mov dword ptr[rip + b], 1       ; 將 1 寫入 b 對應的記憶體, 對應函式 f 中的 b = 1
add eax, 1      ; 將暫存器 eax 的值加一, 對應函式 f 中 a = b + 1 中的 b + 1
mov dword ptr[rip + a], eax     ; 將暫存器 eax 中的值寫入 a 對應的記憶體, 對應函式 f 中的 a = b + 1

Code 1-2 中, 我們看到的記憶體排序應該是 \displaystyle {\mathop {\mathrm {load}}(b) \rightarrow \mathop {\mathrm {store}}(b) \rightarrow \mathop {\mathrm {store}}(a)}. 這種重排最佳化其實緣由於現代 CPU 架構. 在現代架構上, CPU 和記憶體之間並不是直接相連的, 中間還間隔著暫存器和快取 (快取甚至分為多個級別, 不同用途). 當 CPU 從記憶體中存取變數時, 實際上會先檢查這個變數是否已經被載入到暫存器或者快取中. 若有, 則直接在暫存器和快取中進行操作, 這比直接操作記憶體要快得多. 但若暫存器和快取中都沒有, 才會開始操作記憶體. 不過即便如此, CPU 也不是直接操作記憶體, 而是將記憶體中的一部分複製到快取中, 在快取 (或者再取一部分到暫存器, 在暫存器) 中進行操作, 操作完成後再將新值重新寫入記憶體中. 這種行為並不難以理解, 因為在一個程式碼範圍內, 某些變數的操作可能非常頻繁, 在快取中操作比在記憶體中操作要快得多. 儘管多了複製到快取和快取寫入記憶體這兩步額外的操作, 但是從總體時間上來說, 複製後在快取中多次操作也比多次操作記憶體要更快. 然而, 快取的容量和記憶體相比極其有限, 例如 Apple M4 Max 晶片, 其最大記憶體可支援至 128 GB, 快取的結構為 : L1 指令快取 128 KB + L1 資料快取 64 KB + L2 快取 4 MB, 這完全不在一個等級. 現在讓我們回到 Code 1-1. 假設變數 b 在快取中, 但是位置非常靠前, 下一次有資料要複製到快取中就必須要將 b 從快取中移除, 並將值寫入記憶體. 與此同時, 變數 a 不在快取中. 若要按照排序 \mathop {\mathrm {load}}(a) \rightarrow \mathop {\mathrm {store}}(a) \rightarrow \mathop {\mathrm {store}}(b), 則必須先將變數 a 複製到快取中, 此時變數 b 要從快取中被移除. 當我們進行 \mathop {\mathrm {store}}(b) 的時候, 又要將變數 b 從記憶體中複製回來. 但若按照 \mathop {\mathrm {load}}(b) \rightarrow \mathop {\mathrm {store}}(b) \rightarrow \mathop {\mathrm {store}}(a), 這種情況下我們在快取中操作完變數 b 後僅需從記憶體中複製一次變數 a 即可. 有人可能認為複製一個變數很快, 這種最佳化沒有必要. CPU 從記憶體中複製時, 通常不只是複製單個變數, 通常複製的資料流大小為一個 cache line 的大小, 一般來說是 64 位元組. 也就是為了存取記憶體中的某個變數, CPU 會將該變數所在的某個 64 位元組區塊從記憶體裡複製到快取上. 這種做法並非完全沒有道理, 因為在一個程式碼區塊中, 相鄰的變數在記憶體中通常會排列在一起.

除了編碼器之外, CPU 也會在運作指令時改變變數的記憶體順序. 不過準確地說, CPU 改變順序的核心原因是因為其會對已經編碼好的指令進行重排 :

void f() {
    int x {};
    // ...
    bool condition {...};
    if(condition) {
        x = 42;
    }
    // ...
}

Code 2-1 中, 若 condition 的結果需要一段時間等待, 但是等待的時長預計比讓出 CPU 給其它執行緒更短, 那麼 CPU 可能會在等待期間直接運作 if 區塊中的程式碼, 此時 Code 2-1 的順序看上去會像

void f() {
    int x {};
    // ...
    bool condition {...};
    int save {x};
    x = 42;     // (1)
    if(not condition) {
        x = save;
    }
    // ...
}

Code 2-2 (1) 處, x 的值事實上已經被當前執行緒改變了. 若 x 不是局域變數, 則其它執行緒可能存取到非預期的值. 不過這種情況我們無需擔心, 因為重排的執行緒會保證其它執行緒存取 x 的值時, 必定得到重排之前 x 的舊值 (若中間不存在其它執行緒更改, 則其它執行緒看到的是 save 的值, 而不是 x = 42). 若還有一個 else, 那麼 CPU 會選擇其中一個 CPU 認為成立機率較高的分支提前運作, 這涉及到分支預測, 並不是我們現在要討論的內容. 有了這種最佳化, 若預測正確, 相當於利用了等待的時間. 即使預測不正確, 要還原資料或者進入另一個分支, 總體的運作時間相比於最佳化之前也是大致保持不變.

重排能夠提高程式運作效能, 但它並不總是生效. 編碼器和 CPU 都會保證重排後程式的運作結果和重排前保持嚴格一致 (as-if 規則). 例如

#include <vector>

std::vector<int *> *v {};
void release_v(int *reserved) {
    for(const auto p : *v) {
        if(p not_eq reserved) {
            delete p;
        }
    }
    delete v;
}
int *f(int index, bool delete_v) {
    // ...
    int *p {};
    // 本區域中不涉及任何與 v 相關的直接或者間接操作
    p = (*v)[index];        // (1)
    // 本區域是與 p 相關的操作
    // ...
    // 區域結束
    if(delete_v) {
        release_v(p);
    }
}

我們可以發現在 Code 3 中, 全域變數 v 的回收判斷可以放到陳述式 (1) 之後, 一旦回收的條件成立, 便可以立即釋放出更多記憶體空間. 因此這種重排是允許的. 但是若放到陳述式 (1) 之前, 便破壞了語義, 從而導致錯誤, 故編碼器和 CPU 都不能將回收相關的程式碼放到陳述式 (1) 之前.

在單執行緒下, 類似於 Code 3 這樣的重排能夠給我們帶來收益, 但在多執行緒中可不一定. 考慮下面這個實例 :

class pimpl_big_object;

pimpl_big_object *get() {
    static pimpl_big_object *instance {new pimpl_big_object {}};
    return instance;
}

pimpl_big_object 使用 PIMPL 手法實作, 並以單一指標指向一個非常大的物件, 該物件的初始化耗時較多, 那麼可以把 new 中的 operator new 放在第一次呼叫時立即完成, 將 placement new 放在 instance 真正指向的物件的第一次使用處 (可能部分地方只是獲得了 instance 指標, 等到很久之後才使用它). 這種重排可以加速大型程式的效能, 尤其是打開效能. 在單執行緒下, 根據指令, 我們可以判斷應該在什麼時候對 instance 運作 placement new, 但是在多執行緒下這種判斷完全失效, 除非建立適當的同步. 否則, 一個執行緒是無法知曉另一個執行緒使用 instance 的具體時機的. 此時, 我們應該要讓初始化在記憶體配置完成後立即完成, 並在結束後讓所有執行緒知道 instance 可用.

另一方面, 現代電腦中 CPU 的不同核心可能有自己的快取. 若 CPU 1 完成了對某個變數 x 的更改, CPU 1 可能只是將新值暫時寫入自己的快取中, 而非直接更新記憶體中的值. 此時, 若 CPU 2 從記憶體中存取 x 的值, 得到的是 x 的舊值. CPU 2 處理舊值可能是一次無用功, 但也有可能導致未定行為甚至出錯. 因此, 我們希望在適當的時候讓 CPU 1 和 CPU 2 之間針對變數 x 建立同步, 讓 CPU 2 能夠及時看到 CPU 1 寫入的新值. 值得注意的是, 這種情況在單執行緒下是不存在的, 因為 CPU 自己肯定知道應該從快取中存取變數還是從記憶體中存取變數. 除此之外, 對於先存後取的情形, CPU 也總是會取到自己最後寫入的最新值, 即自己可以同步給自己最新的值.

1.2 寬鬆次序

我們已經討論了重排和 CPU 快取對多執行緒程式的影響. 不過到目前為止, 所有程式碼都沒有使用互斥鎖加以保護, 這會導致多執行緒下的資料競爭. 資料庫的交易中有一個性質叫作原子性, 表示一個交易要麼整體都完成運作, 若發生失敗或者阻斷, 則整體回滾, 相當於交易沒發生過一樣, 整體都不運作. 由於 C++ 的原子型別在本章節前引入會增加討論和閱讀的難度, 我們在這裡先假設所有程式碼中的變數都具有原子性, 都被原子所保護. 例如

int a {};
int increase_a() {
    return ++a;
}

若有兩個執行緒同時呼叫 increase_a, 兩個執行緒分別會從回傳值中獲取哪些可能的值呢? 一般來說, 一個執行緒先運作, 得到 a 的值 1; 另一個執行緒再運作, 得到 a 的值 2, 這也是預想的情況. 但是對於前置遞增運算子, 其內部實際上分為兩步 :

  1. 對變數 a 增加一;
  2. 回傳變數 a 的最新值.

那麼就存在一種情況, 執行緒 T1 對 a 的值增加了一, 但是執行緒 T1 的運作被暫停; 此時, 執行緒 T2 開始運作, 對 a 的值再增加一. 最終, 執行緒 T1 和 T2 都得到了 a 的值為 2. 為了避免這種情況的發生, 在我們引入 C++ 的原子型別之前, 接下來的討論會將每一個變數視為原子變數. 即對一個變數的內建操作 (各種運算子或者呼叫其成員函式的行為) 視為一個交易, 要麼完整發生, 要麼整體不發生. 若一個原子操作正在被運作, 則沒有任何其它執行緒可以從外部強制打斷這一過程, 要想使用這個變數, 必須等待之前的交易性質原子操作完成. 在此基礎上, 若我們不在乎一些獨立變數的重排且不在乎其它執行緒能否存取到本執行緒剛寫入的最新值, 那麼我們便得到了一種記憶體排序模型 : 寬鬆次序模型 (relaxed ordering model). 進一步地,

void g(int &);
void f(int &a, int &b, int &c) {
    a = c + 1;
    g(b);
    ++c;
}

不在乎獨立變數的重排指的是在 Code 6 函式 f 中, 將 f 中的陳述式順序變為 a = c + 1 -> ++c -> g(b) 或者 g(b) -> a = c + 1 -> ++c, 這並不影響最終的結果. 因此編碼器和 CPU 可以自由地進行重排, 取一個運作效能最高的運作次序. 但若重排為 g(b) -> ++c -> a = c + 1, ++c -> a = c + 1 -> g(b) 或者 ++c -> g(b) -> a = c + 1, 則函式 f 的運作結果將發生改變, 這打破了重排不能改變程式運作結果的約定, 因此這三種重排不可能發生. 換個角度來說, ac 的值是存在依賴的, ab 相互獨立, bc 相互獨立, 但 ac 不獨立. 在 a = c + 1 運作之前, c 的值可以被其它執行緒提前改變, 例如進入函式 fc 的值為 1, 在運作 a = c + 1 之前, c 的值被其它執行緒改為 2. 但若將本執行緒內的 ++c 提前至 a = c + 1 之前, 使得 c 的值 2a 的值 3, 這種重排是錯誤的. 另一方面, 不在乎其它執行緒能否取到本執行緒剛寫入的最新值指的是在 Code 6 函式 f 中, 若 a = c + 1 剛完成, 執行緒可能只是將 a 的最新值寫入自己所屬 CPU 的快取中, 而不是立即寫入記憶體中. 待函式 f 完全運作完成甚至執行緒運作終結之後才一起將 a, bc 的最新值寫入記憶體中 (具體寫入記憶體的時機和策略可能複雜得多, 此處只是舉一個簡單的例子). 若執行緒在完成 a = c + 1, a 由舊值 1 變為新值 2, 但還未寫入記憶體, 此時我們並不在乎其它執行緒存取到 a 獲得的值是 1 還是 2, 即 a 的值新值還是舊值.

一個符合寬鬆次序模型的程式碼結果是不確定的, 在不同時機去運作可能得到不同的合法結果 :

#include <thread>

bool a {}, b {};

void write() {
    a = true;
    b = true;
}
void read() {
    while(not b);
    if(not a) {
        throw std::runtime_error("a is not true!");
    }
}
int main(int argc, char *argv[]) {
    std::thread t1(write), t2(read);
    t1.join();
    t2.join();
}

Code 7 中, 函式 read 可能擲出例外情況, 也可能不會. 首先, Code 7 可能因為重排導致 b = true 先發生, 此時 a 仍然為 false, 從而導致函式 read 中擲出例外情況; 也可能由於 b 的值被寫入記憶體而被執行緒 t2 看到, 但同時 a 的值並沒有寫入記憶體, 從而導致 read 擲出例外情況. 同樣地, 也有可能因為沒有重排或執行緒 t1 及時將值寫入了記憶體 (甚至因為執行緒 t1t2 共享同一快取) 使得執行緒 t2 正常結束, 沒有例外情況被擲出.

我們再來看一個稍複雜的例子, 加深我們對寬鬆次序模型的理解 :

#include <vector>
#include <tuple>
#include <thread>

int a {}, b {}, c{};
std::vector<std::tuple<int, int, int>> v;

void increase() {
    for(auto i {0}; i < 10; ++i) {
        ++a;
        ++b;
        ++c;
    }
}
void save() {
    for(auto i {0}; i < 10; ++i) {
        v.emplace_back(a, b, c);
    }
}

int main(int argc, char *argv[]) {
    std::thread t1(increase), t2(save);
    t1.join();
    t2.join();
}

現在我們列出 Code 8 運作完畢後, 其中一種 v 的結果 :

[0] : (0, 1, 0)
[1] : (0, 1, 0)
[2] : (0, 1, 0)
[3] : (3, 1, 0)
[4] : (5, 5, 0)
[5] : (5, 6, 0)
[6] : (10, 6, 0)
[7] : (10, 6, 0)
[8] : (10, 6, 10)
[9] : (10, 6, 10)

對於這種跳躍式的增長結果, 我們無需感到過度驚訝. 因為執行緒 t1 中的變數並不是實時與執行緒 t2 同步的. 執行緒 t2 在存入陣列的時候, 並不保證一定取到了 a, bc 三個變數的最新值. 這是寬鬆次序這種記憶體過度放鬆約束導致的. 那麼我們是否可能在一個時刻獲取到了新值, 在下一個時刻獲取到一個舊值, 從而產生這樣的序列 :

[0] : (1, 1, 1)
[1] : (1, 2, 1)
[2] : (1, 1, 1) b 取得舊值
[3] : (1, 2, 3)
...

寬鬆次序模型不允許這種情況發生. 儘管它幾乎沒什麼約束, 但是也保證了在某個執行緒下, 對於同一個變數若存在 \displaystyle {x = 1 \rightarrow x = 2 \rightarrow x = 3} 這樣的次序, 另一個執行緒第一次如果存取到變數 x 的值為 3, 則這個執行緒便不可能再看到比 x = 3 更舊的值, 即 x = 1x = 2. 這就好比在單執行緒下, 若我們進行上述操作, 接下來的程式碼中若變數 x 保持不變, 那麼 x = 3 恆成立. 若 x 的值變回 12, 則我們可以斷定程式發生錯誤. 同樣地, 編碼器與 CPU 在同步操作序列給其它執行緒的時候, 即便記憶體排序模型模型是寬鬆次序, 也不會允許這種情況發生.

1.3 全域一致性次序

寬鬆次序給予了 CPU 和編碼器子夠高的自由度, 所以編碼器可以最大程度最佳化多執行緒下的程式碼, CPU 也能夠以最高運作效能為標準為指令進行重排. 但是寬鬆次序也帶來了一些壞處. 假設我們正在維護一個前向連結串列 (參考文章《【資料結構】前向連結串列》) A->B->C. 執行緒 T1 向前向連結串列中插入一個結點 D, 得到 D->A->B->C, 但執行緒 T2 在寬鬆次序下還未及時獲知 T1 已經插入了一個新的結點, T2 看到的前向連結串列仍然是 A->B->C. 此時 T2 想要移除頭部結點, 同時將 A 的下一個結點 B 作為新的頭部結點. 不幸的是, 執行緒 T2 所做的事情被及時同步給了其它執行緒, 那麼所有執行緒看到的新頭部結點都是 B, D 消失不見了. 最終, D 對應的記憶體沒人回收, 也沒人執行緒中存在變數去指涉, 發生了流失. 對應地, 如果兩個執行緒同時想要移除頭部結點, 其中一個執行緒未同步另一個執行緒所做的移除操作, 那麼一個頭部結點就有可能被回收兩次, 產生未定行為. 於是, 我們需要一個更加嚴格的記憶體排序模型.

為了解決寬鬆次序中變數無法及時被寫入記憶體, 從而導致其它執行緒可能看到舊值的問題, 現在我們要求所有執行緒對變數進行寫入操作後, 若其它執行緒後續需要存取該變數, 則必須及時將值進行同步, 寫入記憶體, 防止其它執行緒看到舊值這一情況再次發生. 我們已經在前面假設了所有變數及其操作都具有原子性, 當一個執行緒正在對變數 x 進行修改時, 其它執行緒若想要存取 x 的值, 只能等待本次修改操作結束. 在新規定下, 其它執行緒也必定看到 x 被修改後的最新值. 那麼這樣是否解決了所有問題? 我們來看一個實例 :

struct point {
    int x;
    int y;
} *data {};

void allocate_data(int x, int y) {      // 保證程式運作過程中只被呼叫一次
    auto p {static_cast<point *>(operator new(sizeof(point)))};     // (1)
    p->x = x;       // (2)
    p->y = y;       // (3)
    data = p;       // (4)
}
void read_data() {
    while(not data);        // (5)
    // ...
}

若有一個執行緒 T1 負責配置 data, 其它執行緒在 T1 結束之後開始呼叫函式 read_data, 由於我們規定 T1 寫入的 data 新值能夠及時同步給其它執行緒, Code 9-1 看似運作得不錯. 從執行緒 T1 的角度來看, 將 data = p 這一指派動作放到 xy 設定之前完全沒有問題, 這並不改變函式 allocate_data 的最終結果, 其它執行緒也是在 T1 結束之後才開始呼叫函式 read_data. 但站在多執行緒程式設計的角度來說, 我們實際上無法假設其它執行緒總是保證在 T1 結束之後才開始呼叫 read_data, 畢竟引入多執行緒本身是為了提高程式效能. T1 的運作時間可能比我們想像的要更久, 從 Code 9-1 來看, 其它執行緒在 data 不為空的情況下便可以開始運作了. 此時, 我們必須規定 data = p 這一動作必須在 xy 設定完成之後才能進行. 這一次序的規定是從全域角度出發的, 不僅僅是針對某一個執行緒而言的. 因此之後可能存在 data 回收之後, 由非 T1 執行緒重新呼叫 allocate_data 重新配置資料這一種情況.

從第一個開始運作的執行緒的第一條指令開始, 直到程式終結, 會產生一個完整的指令集合. 每一個執行緒執行的所有指令集合都是完整指令集合的子集. 例如 A->B->C->D->E, 由執行緒 T1 運作 A->C->D, 由執行緒 T2 運作 B->D->E. 由於各個 CPU 核心同時或者交替工作, 變數又具有原子性, 這樣的指令集合是一定會產生的. 若我們規定, 全域指令集合中某些操作具備先後順序, 要求執行緒在運作的時候也必須遵守這個順序, 結合上一段討論中變數需要將新值在一定情況下同步給其它執行緒, 我們便得到了一種相當嚴格的記憶體順序模型 : 全域一致性次序模型 (global sequential consistency model). 一個多執行緒程式, 我們稱其滿足全域一致性次序若且唯若

  1. 該多執行緒程式有一個全域一致的運作序列 \displaystyle {O_{1} \rightarrow O_{2} \rightarrow ... \rightarrow O_{n}}, 任一執行緒的運作序列 \displaystyle {O_{i_{1}} \rightarrow O_{i_{2}} \rightarrow ... \rightarrow O_{i_{m}}} 滿足 1 \leq i_{1} < i_{2} < ... < i_{m} \leq mm \leq n;
  2. 任意原子變數的新值需及時同步給其它需要存取的執行緒.

顯然, 若有全域一致性次序操作序列 O_{1} \rightarrow O_{2} \rightarrow ... \rightarrow O_{n}, 任一執行緒的運作序列必定是從全域一致性序列中移除若干個操作, 按順序, 不可重排地從前向後連接所得到的序列. Code 9-1 中的全域一致性次序必然滿足

... -> (1) -> ... -> (2) -> ... -> (3) -> ... -> (4) -> ....

現在我們為 Code 9-1 增加計數, 以統計變數 data 被配置的次數 :

struct point {
    int x;
    int y;
} *data {};
unsigned long count {};

void allocate_data(int x, int y) {
    auto p {static_cast<point *>(operator new(sizeof(point)))};     // (1)
    p->x = x;       // (2)
    p->y = y;       // (3)
    data = p;       // (4)
    ++count;        // (5)
}
void read_data() {
    while(not data);        // (5)
    // ...
}
int main(int argc, char *argv[]) {
    // ...
    // 等待所有子執行緒結束
    // 將 count 寫入某個資料流中
}

顯然, 整個程式中使用 count 的僅有函式 main, 且使用時機是在所有子執行緒結束後, 那麼將陳述式 (5) 放在函式 allocate_data 的任意位置看起來都可以. 但是由於執行緒必須遵守一個固定的全域一致性次序, 因此事實上陳述式 (5) 並不能被隨意重排. 但若全域次序中將陳述式 (5) 放在了 (1) 之前, 那麼執行緒內部必須對 (5) 進行重排, 重排到 (1) 之前. Code 9-2 表明, 在全域一致性次序模型下, 重排仍然被允許, 只是重排具有較多約束. 除此之外, 全域一致性次序也不是唯一的. 可以說, 每次程式啟動後, 全域一致性次序都不一定相同, 其主要受到一下因素影響 :

  • CPU 架構 : x86 架構天然地接近全域一致性次序模型 (但不是完全採用全域一致性次序模型), 重排相比於 ARM 更少;
  • 編碼最佳化和連結階段最佳化;
  • 執行緒啟動的順序, 啟動時間和上下文切換的時間點;
  • 作業系統對執行緒的排程及調度策略;
  • CPU 使用的快取或者緩存機制;
  • 系統呼叫或中斷 : 在高併發下, 若一個 CPU 發現自己無法獨佔某個物件, 則會透過系統呼叫來讓出自己給其它執行緒以提高 CPU 利用率;
  • 熱路徑最佳化 : 若在程式運作之前用真實的輸入預運作程式, 可以提高快取命中率, 分支預測的準確率, 優化指令在記憶體中的佈局;
  • 使用者不同的真實輸入;
  • 其它程式中的非決定性因素.

相比於寬鬆次序, 全域一致性次序非常可靠, 我們無需擔心值的新舊問題, 但這是有代價的. 由於每一個變數的值都需要同步給其它執行緒 (至少在其它執行緒存取某個變數具體值的時候必須同步, 可以無需實時主動同步), 因此採用全域一致性次序模型的程式碼便比採用寬鬆次序模型的程式碼需要更多指令用於同步, 程式的總體效能自然更低.

1.4 發布-獲取次序

我們升級一下 Code 9-2 中的需求, 我們要求有一個執行緒可以定時輸出 count 的值, 因此我們可以將程式碼改寫為

#include <iostream>

struct point {
    int x;
    int y;
} *data {};
unsigned long count {};

void allocate_data(int x, int y) {
    auto p {static_cast<point *>(operator new(sizeof(point)))};
    p->x = x;
    p->y = y;
    data = p;
    ++count;
}
void read_data() {
    while(not data);
    // ...
}
void log_count() {
    std::cout << count << std::endl;
}

現在考慮一個問題, Code 9-3 中的函式 log_count 如果被呼叫, 當 count 的具體值出現在螢幕上的時候, 是否可以反應那一時刻的準確數量? 我們可以確保的是, 當 log_count 存取原子變數 count 的那一刻, count 的值是最新且最準確的. 但是 std::cout << countstd::cout << std::endl 的運作都還需要一段時間, 也就是 log_count 存取到 count 的最新值到 count 的值被輸出到螢幕上, 這中間是存在間隔的. 在這個間隔中, count 可能被其它執行緒更新. 因此我們可以確定, 函式 log_count 的輸出只能給出一個大概值, 真實值至少大於等於輸出的那個值. 那 log_count 每次都要求存取 count 的最新值還有意義嗎? 顯然, count 的值不需要進行太多同步, 偶爾同步一下足以滿足. 每次存取 count 都要進行一次同步只會給程式效能帶來下降, 我們希望降低 count 的同步頻率來換取效能上的提升. 此時, 程式便不能完全採用全域一致性次序模型.

為了達到我們的目的, 我們又一次需要改變前面的假設 : 程式碼中的變數要想將最新寫入的值同步給其它執行緒, 必須將對應的寫入操作放入 RELEASE 區塊中; 執行緒要想存取到其它執行緒寫入的最新值, 必須將存取操作放入 ACQUIRE 區塊中. 未放入 RELEASE 區塊或者 ACQUIRE 區塊中的程式碼一律視為使用寬鬆次序, 僅保證變數的原子性. 為此, 我們改寫 Code 9-3 中的程式碼 :

#include <iostream>

struct point {
    int x;
    int y;
} *data {};
unsigned long count {};

void allocate_data(int x, int y) {
    auto p {static_cast<point *>(operator new(sizeof(point)))};
    p->x = x;
    p->y = y;
    RELEASE {
        data = p;
    }
    ++count;
}
void read_data() {
    ACQUIRE {
        while(not data);
        // ...
    }
}
void log_count() {
    std::cout << count << std::endl;
}

Code 9-4 中, RELEASE 區塊之前使用的都是寬鬆次序, 那麼 data = p 是否會將全部資料都同步給呼叫函式 read_data 的執行緒呢? 答案是肯定的. RELEASE 區塊有一個要求 : 為了確保同步給其它執行緒的資料是完整的, 在 RELEASE 區塊之前的所有操作都必須保證已經完成. 這些操作可以被編碼器或者 CPU 重排, 但是必須保證最終的結果與未重排之前保持嚴格一致. 也就是說, 如果指標 p 是全域變數, 在函式 read_data ACQUIRE 區塊中存取 p, 也可以存取到 p 的最新值, 儘管 p 沒有被放入 RELEASE 區塊. 這邊是 RELEASE 區塊具有的隱含同步效果. 與 RELEASE 區塊對應, ACQUIRE 區塊中為了確保存取的資料是正確的, 區塊內所有存取操作都必須在 ACQUIRE 區塊結束之前強制完成, 在 ACQUIRE 區塊下面的程式碼不能躍過 ACQUIRE 區塊提前運作. 這是因為在 ACQUIRE 區塊之後可能存在寫入操作, 導致存取操作存取了本不應該屬於本次獲取的值. 舉個例子, 若執行緒 T1 更新了指標 p, 執行緒 T2 本應該先存取 p 的最新值, 才去更改 p 的值. 但是執行緒 T2 被重排之後, 先更改了 p 的值才去存取, 顯然這會導致錯誤.

上面的討論已經導出了另一種比全域一致性次序模型更寬鬆, 但比寬鬆次序模型更嚴格的新記憶體排序模型 : 發布-獲取模型 (release-acquire model), 也稱為釋放-獲取模型. 這種記憶體模型僅選擇必須要的部分進行同步, 因此程式的總體運作效能要高於採用全域一致性次序模型的相同程式. 發布-獲取次序並不要求每一個執行緒的運作序列必須是全域序列的順序子集, 因為使用發布-獲取模型的程式碼本身可能有一部分採用的是寬鬆次序, 這部分次序的指令順序是無法確定的. 否則, 我們根本就沒必要分析哪些變數需要及時同步, 哪些變數不需要了, 直接採用全域一致性次序模型省事又不容易出錯.

值得注意的是, RELEASE 區塊和 ACQUIRE 區塊必須成對出現才會有同步效果. 如果某個變數的寫入被放入 RELEASE 區塊, 但是存取這個變數值的程式碼沒有被放入 ACQUIRE 區塊, 那麼這個存取操作是不保證一定獲得該變數最新寫入值的. 也就是說, 一旦一個變數的存取在發布-獲取模型中沒有被放入 ACQUIRE 區塊, 那麼我們可以認為在當下程式碼中, 能否獲取該變數最新的值並不重要, 只要能夠獲取到值即可.

儘管我們對 RELEASEACQUIRE 區塊及其上下程式碼的運作有一定約束, 還有一種情況值得我們深入討論 :

int x {}, y {};
void f(int new_x, int (*op)(int)) {
    // ...
    RELEASE {
        y = op(x);
        // ...
        x = new_x;
    }
    // ...
}

Code 10 中, 我們假設執行緒 T1 已經將 RELEASE 區塊中的操作完成, 最終發布 x = 0, y = 0. 另一個執行緒 T2 僅僅完成了 y = op(x), 發布 y = 1. 此時, 若執行緒 T3 透過 ACQUIRE 區塊存取 xy 的值, 可以肯定 x 的值為 0, y 的值如何確定? 事實上, 執行緒之間的原子變數同步並非我們想像得那麼簡單. 當執行緒 T3 存取到 x = 0 的時候, 它還會獲得另一個信息 : x = 0 這個值是由執行緒 T1 寫入的. 接下來當執行緒 T3 想要獲取 y 的值的時候, 存取動作會攜帶下面的信息 :

  1. 我已經存到變數 x 的值為 0;
  2. 我獲得的 x = 0 這個值是由執行緒 T1 寫入的.

最終, 當執行緒 T3 透過上述信息想要獲取變數 y 的值的時候, 這個值必然也由執行緒 T1 寫入, 即執行緒 T3 一定會獲得 y = 0, 即便此時執行緒 T2 已經發布 x = 1y = 1, 甚至還有其它執行緒已經先於執行緒 T3 獲取到了最新的值. 只要執行緒 T3 的存取操作攜帶了上面的信息, 那麼 T3 必然不可能獲知執行緒 T2 寫入的值. 這與發布-獲取次序模型的一個約束有關 : 若執行緒已經獲取了某個序列發布的值, 那麼執行緒在本次獲取中得到的其它值也必然是由這個序列發布的, 無論是否存在其它執行緒寫入了更新的值. 值得一提的是, 這條規定在全域一致性次序模型下也依然成立, 只是由於全域一致性次序模型本身具有高度嚴格性, 這條規定很容易被忽略.

1.5 發布-消費次序

由於發布操作強制要求 RELEASE 區塊在結束之前, 裡面的所有操作已經完成, 且會將這些操作一起同步給其它執行緒, 因此在 Code 10 中, 將 y = op(x) 放到 RELEASE 區塊之前, 同步也能正常進行, 其它執行緒若獲取到變數 x 的新值, 那麼必然也能獲取到 y 的新值. 這隱含了一個信息, 若我們在 ACQUIRE 區塊中同步到了變數 x 的新值, 那麼 y 的值也已經同步好, 只是等待執行緒的獲取. 然而, 若我們根本不需要變數 y 的新值, y 的同步便是多餘的, 這又給程式碼的效能帶來了下降. 現在, 我們增加一個 CONSUME 區塊, 用於標記只有本區塊內存取了的變數才需要進行同步, 其它變數的值可以不同步 :

int x {}, y {}, z {};

void f(int a, int b, int c) {
    RELEASE {
        x = b + c;
        y *= z * a + b;
        z = c;
    }
}
void read_z() {
    CONSUME {
        auto value {z};     // 僅同步 z, 可以無需同步 x 和 y 的最新值
        // ...
    }
}

這個時候, 我們又導出了另一種比寬鬆次序模型嚴格, 但比發布-獲取次序稍放鬆的新記憶體排序模型 : 發布-消費次序 (release-consume ordering). 類似地, 採用發布-消費次序模型的程式在效能上比採用發布-獲取次序的相同程式更好.

我們已經討論了四種記憶體排序模型, 看起來如果可能的情況下, 我們應該儘量選擇發布-消費次序. 然而在 C++ 下, 編碼器想要真正生成完全符合發布-消費次序模型的程式碼極其困難. 考慮下面程式碼 :

void *p {};
union {
    char *c;
    unsigned long y;
} u {};

void allocate_p() {
    RELEASE {
        p = operator new(128);
        u.y = reinterpret_cast<unsigned long>(p);
    }
}
void use_c() {
    CONSUME {
        const char *data {u.c};
        // 接下來僅使用 data, 不出現 p 和 u.y
        // ...
    }
}

Code 12 中, 我們在 CONSUME 區塊僅使用了 u.c, 並沒有使用 pu.y. 從函式 use_c 的上下文來看, 編碼器很難分析函式 allocate_p 中的最新資料是否應該同步過來. 這種變數之間的依賴關係變得非常隱晦且難以分析, 主要是因為 C++ 繼承了 C 的弱型別特點. 為了應對這種情況, 主流的編碼器都選擇將發布-消費次序模型退化為發布-獲取模型, 自動將 CONSUME 區塊視為 ACQUIRE 區塊. 因此, 在 C++ 中使用發布-消費次序基本上等同於使用發布-獲取次序. 發布-消費次序也在 C++ 26 中被廢棄, 在可預計的將來極大可能會被 C++ 從標準中移除. 原則上來說, 我們應該儘量避免使用 std::memory_order_consume.

1.6 因果一致性次序

還有一種和發布-消費次序模型比較像的記憶體排序模型 : 因果一致性次序 (casual consistency ordering). 在因果一致性次序中, 若操作 A 導致了操作 B 的發生, 那麼就需要在操作 B 發生之前先將操作 A 的結果同步過來; 否則, 可以不同步. 例如 if(flag) 條件成立, 導致接下來需要存取變數 a 的值, 那麼同步變數 flaga; 否則, 可以不同步 a 的最新值. 在沒有同步的情況下, 可能因為 CPU 指令重排, 導致 flag 的值先被假設為 true, 而此時 a 的值還未準備好. 這種記憶體次序模型的變數依賴關係比發布-消費次序更難分析, 因此編碼器也不太可能支援這一記憶體排序模型. 事實上, 因果一致性次序模型也沒有被引入 C++ 中, 根據發布-消費次序模型在 C++ 中的處境, 可以推測因果一致性次序模型基本不可能被引入 C++.

1.7 交易性記憶體

在最開始為變數引入原子性的時候, 我們是透過借鑑資料庫交易進行的, 而資料庫交易事實上可以看成一整段帶有同步性質的程式碼. 若交易成功, 則將最新的值同步給其它執行緒; 若交易失敗, 則整體回滾, 其它執行緒不可能存取到因為交易未完成而臨時寫入的值. 為此我們引入 TRANSACTION 區塊, 用於模擬資料庫交易 :

#include <vector>

std::vector<int> v {};

void f(int *values, std::size_t n) {
    TRANSACTION {
        v.insert(v.cbegin(), values, values + static_cast<std::ptrdiff_t>(n));
        v.insert(v.cend(), values, values + static_cast<std::ptrdiff_t>(n));
    }
}

Code 12 中, 若兩個插入操作都完成, 交易結束, 其它執行緒在存取 v 的時候會看到 v 的新狀態. 若因為記憶體不足或者其它原因導致第二個插入操作失敗從而導致整個交易失敗, 即使在頭部插入操作已經完成, 也會因為交易要回滾, 其它執行緒無法獲取第一次插入的任何值. v 的狀態整體回滾到交易開始之前的那個狀態.

顯然, 在 TRANSACTION 區塊中的程式碼是具有全域一致性次序的. 否則, 執行緒可能因為重排, 即使在回滾發生之後也能存取到交易過程中所產生的臨時值, 這並不符合交易的約束.

交易性記憶體還沒有被引入 C++, 不過有一個提案正在被討論 (Transactional Memory TS, 2015), 目前還沒有什麼最新的進展. GCC 有一個針對該提案的實作擴展, 對應編碼旗幟 -fgnu-tm.

2. 原子型別與原子操作

在討論記憶體排序時, 我們假設每個變數都具有原子性, 但 C++ 本身是追求極致效能的程式設計語言, 自然不會給每個變數都主動增加原子性. 在 C++ 標準樣板程式庫標頭檔 <atomic> 中, 提供了 std::atomic 樣板類別, 用於宣告一個原子變數.

2.1 std::atomic

std::atomic 是接受一個樣板引數的樣板類別, 該樣板引數是原始變數對應的型別. 對於內建型別, std::atomic

  • signed char, short, int, longlong long;
  • unsigned char, unsigned short, unsigned int, unsigned longunsigned long long;
  • char, char8_t, char16_t, char32_twchar_t;
  • float, doublelong double;
  • 指標型別 T *,

進行了特製化或者偏特製化, 因為它們還支援一些運算子. 除此之外, C++ 20 還支援了 std::atomic<std::shared_ptr<T>>std::atomic<std::unique_ptr<T>>, 並且為參考型別引入了 std::atomic_ref. 若要宣告一個 int 型別的原子變數, 非常簡單, std::atomic<int> x {};.

對於像 std::int8_t, std::uint8_t, std::int_least8_t, std::uint_least8_t, std::int_fast8_tstd::uint_fast8_t 等這樣的 std::int*_t, std::uint*_t, std::int_least*_t, std::uint_least*_t, std::int_fast*_tstd::uint_fast*_t 型別, std::atomic 也進行了支援. 另外, std::intptr_t, std::uintptr_t, std::size_t, std::ptrdiff_t, std::intmax_tstd::uint_max_t 也是支援的. 當然, 這些特製化的底層實際上大概率還是內建型別, 只是在不同裝置上有不同的底層型別.

std::atomic<T> 物件可以直接像型別 T 發生隱含型別轉化, 回傳的是 T 的副本而不是參考. 它也支援透過成員函式 storeload 來寫入或者存取值, 透過成員函式 exchange 指派新的值同時回傳舊值 :

#include <atomic>

std::atomic<int> i {};

bool f(int value) {
    int old_value {i.load()};       // unused
    i.store(value);
    int current_value {i.exchange(value * 2)};
    return current_value == value;
}

Code 13 中的函式 f 用於判斷原子變數 i 的值在 f 運作的過程中是否被其它執行緒更改. 這也給了我們一些提示, 若在一個區塊內有多個操作依賴原子變數的值, 我們應該先存取後儲存下副本, 而不是每次都使用成員函式 load 去存取. 因為在多執行緒下, 原子變數的值可能在第一次存取完成之後隨時發生改變. 有些操作可能需要基於舊值進行, 每次都存取新值可能會導致錯誤. 我們基於獲取到的值進行運算之後, 想要將結果傳遞給原子變數, 但是如果原子變數的值發生了改變, 這次運算的結果可能需要作廢 :

#include <atomic>

void f(std::atomic<int> *data) {
    int value {data->load()};
    int result {};
    // ...
    if(value == data->load()) {
        data->store(result);
    }else {
        f(data);        // data 的值被其它執行緒改變, 重新計算
    }
}

然而 Code 14-1 並不是正確的. 因為在 value == data->load() 成立之後, 在 data->store(result) 發生前, 仍然可能會存在其它執行緒更改 data 的值, 看似此處需要互斥鎖的幫助? 不過無需擔心, std::atomic 提供了比較-交換 (compare and swap, CAS)操作 : 若執行緒儲存的值與原子變數的值相等, 才會將新值寫入; 否則, 回傳原子變數現在的新值. 比較-交換操作有兩個與之相關的成員函式 : compare_exchange_strongcompare_exchange_weak, 這兩個成員函式的參數相同. 第一個參數接受一個型別為 T & 的引數, 用於表示期望值. 若原子變數現在的值和期望值相等, 那麼會將第二個引數寫入原子變數中, 這是一次成功的比較-交換操作. 因此, 第二個參數接受一個型別為 T 的引數. 若期望值和原子變數現在的值不相等, 那麼就會更新期望值為原子變數現在最新的值, 這是一次失敗的比較-交換操作. 這兩個成員函式的區別在於 compare_exchange_weak 可能會在原子變數和期望值相等的情況下, 比較-交換操作仍然失敗, 而 compare_exchange_strong 則不會. 這是由於部分 CPU 指令可能出現失效或者競爭條件, 例如在超高併發下, x86 的 cmpxchg 可能會出現短暫失敗. 我們稱這種情況為虛假失敗 (spurious failure). 比較-交換操作失敗會使得成員函式 compare_exchange_* 回傳 false. 由於 compare_exchange_strong 並不會發生虛假失敗, 因此其回傳 false 一定代表期望值和原子變數現在的值不相等; 而 compare_exchange_weak 並不一定. 成員函式 compare_exchange_strong 內部使用了一些機制來保障自己不會發生虛假失敗, 故其效能相比於 compare_exchange_weak 而言要差一些. 我們可以使用比較-交換操作改寫 Code 14-1 :

#include <atomic>

void strong(std::atomic<int> *data) {
    int value {data->load()};
    for(int result {};; result = {}) {
        // 計算 result
        // ...
        if(data->compare_exchange_strong(value, result)) {
            break;
        }
    }
}
void weak(std::atomic<int *> *data) {
    auto old_pointer {data->load()};
    auto new_pointer {new int {}};
    while(not data->compare_exchange_weak(old_pointer, new_pointer));
    // use old_pointer and delete it
    // ...
}

在函式 strong 中, 如果發生虛假失敗, 那麼計算需要重新進行一次, 這是沒有必要的, 因為計算可能十分耗時, 所以此處我們使用 compare_exchange_strong. 在函式 weak 中, 沒有任何耗時的計算可能因為虛假失敗反覆進行, 所以這裡我們使用效能更高的 compare_exchange_weak.

我們在使用 std::atomic 物件時, 是無需使用互斥鎖的, std::atomic 會保證原子變數的每一個操作進行時, 沒有其它執行緒可以在同一時刻進行另一個操作, 必須等待上一個原子操作完成. 對於不同的型別, std::atomic 內部的實作可能不相同. 對於 intvoid * 這種內建型別, 絕大多數 CPU 都提供了無鎖指令來避免資料競爭, 但是對於一些稍大一些的標準佈局結構體, std::atomic 可能退化到內部使用互斥鎖來模擬原子性. 然而對於很多自定型別, std::atomic 可能完全不支援. 一般而言, 內建型別可以無需擔心, 但是也存在一些特殊情況 :

  • CPU 指令在遇到大小過大的內建型別 (例如超過 64 位元或 128 位元的型別);
  • 某些 32 位元平台下的指標型別;
  • 記憶體對位不標準 (無法對位至 alignas(8) 的型別),

這些型別 std::atomic 內部可能使用互斥鎖來模擬原子操作. 然而若型別 T 不滿足

  • 複製操作必須時 trivial 的 (也就是可以使用 std::memcpy 按位元進行複製);
  • T 的複製建構子未被刪除;
  • T 的移動建構子未被刪除;
  • T 的複製指派運算子未被刪除;
  • T 的移動指派建構子未被刪除;
  • T 不能帶有參考限定, const 限定及 volatile 限定;
  • 可以使用 std::memcmp 按位元進行比較,

中的任一一條, 那麼 std::atomic<T> 連宣告都無法成功, 編碼器會擲出編碼錯誤. 有這樣嚴格的要求也不奇怪, 稍複雜的自定型別通常有自訂的複製操作和比較操作, 使用這些操作時通常需要將 std::atomic 內部持有的物件向外傳遞, 不正確的使用可能導致資料競爭甚至死鎖.

我們無需記住在哪個平台下哪些型別是透過 CPU 無鎖指令實作的, 也無需查閱外部資料. std::atomic 的成員函式 is_lock_free 為我們提供了這樣的信息. 若 std::atomic<T> 內部使用 CPU 無鎖指令實現無鎖, 那麼 is_lock_free 會回傳 true; 否則, 回傳 false. 在效能敏感的場景, 我們可以考慮透過成員函式 is_lock_free 來判斷 std::atomic<T> 是否適用. 除此之外, C++ 17 還為 std::atomic 提供了另一個成員函式 is_always_lock_free. 它是一個靜態成員函式, 值是一個常數表達式, 即該函式被 constexpr 標記. 這樣我們在編碼期便可以確定型別 std::atomic<T> 內部是否使用 CPU 無鎖指令實現無鎖. 若是, 則 is_always_lock_free 回傳 true; 否則, 回傳 false. 當然, 也必然存在一些型別在編碼期無法判斷, 成員函式 is_always_lock_free 回傳 false, 但是 is_lock_free 回傳 true.

現在我們已經知道 std::atomic 的比較-交換操作是基於 std::memcmp 進行的, 那麼我們要特別注意有些值可能會有多種不同的表達. 例如 float 中的 +0.0f-0.0f, 這兩個值一般來說應該視為相等, 然而其位元表達不相同; NaN 也存在多個位元表達. 在一個等位中, 由於多個成員共享一片記憶體, 所以自然會存在值不同但是位元表達相同的情況. 另外, 我們還需要注意記憶體對位的場景 :

struct alignas(4) aligned {
    char x;
    int y;
} o {};

若使用了類別 alignedx 後面的三個位元組, 即 reinterpret_cast<char *>(&o) + x (x \in [1, 3]), 也會造成表面上兩個 aligned 物件的成員 xy 都相等, 但是使用 std::memcmp 回傳非零的問題.

內建型別及部分多平台型別的 std::atomic<T> 會對應一個型別別名 :

  • std::atomic_boolstd::atomic<bool>;
  • std::atomic_charstd::atomic<char>;
  • std::atomic_scharstd::atomic<signed char>;
  • std::atomic_ucharstd::atomic<unsigned char>;
  • std::atomic_shortstd::atomic<short>;
  • std::atomic_ushortstd::atomic<unsigned short>;
  • std::atomic_intstd::atomic<int>;
  • std::atomic_uintstd::atomic<unsigned>;
  • std::atomic_longstd::atomic<long>;
  • std::atomic_ulongstd::atomic<unsigned long>;
  • std::atomic_llongstd::atomic<long long>;
  • std::atomic_ullongstd::atomic<unsigned long long>;
  • std::atomic_char8_tstd::atomic<char8_t>;
  • std::atomic_char16_tstd::atomic<char16_t>;
  • std::atomic_char32_tstd::atomic<char32_t>;
  • std::atomic_wchar_tstd::atomic<wchar_t>;
  • std::atomic_size_tstd::atomic<std::size_t>;
  • std::atomic_ptrdiff_tstd::atomic<std::ptrdiff_t>;
  • std::atomic_intptr_tstd::atomic<intptr_t>;
  • std::atomic_uintptr_tstd::atomic<uintptr_t>;
  • std::atomic_intmax_tstd::atomic<intmax_t>;
  • std::atomic_uintmax_tstd::atomic<uintmax_t>.

除了上面的型別之外, std::int*_t, std::uint*_t, std::int_least*_t, std::uint_least*_t, std::int_fast*_tstd::uint_fast*_t 這樣的型別也都有對應的 std::atomic_int*_t, std::atomic_uint*_t, std::atomic_uint_least*_t, std::atomic_int_fast*_tstd::atomic_uint_fast*_t. 但我們不能混用 std::atomic<T>std::atomic_T. 因為標準樣板程式庫在實作 std::atomic<T> 時, 一般會讓其繼承自一個基礎類別. std::atomic_T 可能和 std::atomic<T> 不等價, 而是其基礎類別. 由於不同標準樣板程式庫的實作各不相同 (即使是同一個標準樣板程式庫也可能會存在不同的版本, 配合不同的編碼器有不同的平台特定實作), 尤其是在多執行緒這一極度依賴平台的主題下, 混用 std::atomic<T>std::atomic_T 可能導致程式的不可攜性.

部分內建型別支援額外的運算子, 例如 operator+ 等. std::atomic 為這些操作也提供了相同功能的成員函式 :

  • operator+=/operator++/operator++(int) \Leftrightarrow fetch_add;
  • oprator-=/operator--/operator--(int) \Leftrightarrow fetch_sub;
  • operator& \Leftrightarrow fetch_and;
  • operator| \Leftrightarrow fetch_or;
  • operator^ \Leftrightarrow fetch_xor;
  • operator= \Leftrightarrow store/exchange;
  • operator T \Leftrightarrow load.

另外, std::atomic 所有所有成員函式都有對應的非成員函式, 例如 std::atomic::compare_exchange_strong 對應了 std::atomic_compare_exchange_strong. 儘管 std::atomic 本身所持有的物件支援複製操作和移動操作, 但是 std::atomic 是不可複製也不可移動的, 這也不難理解. std::atomic 在複製和移動下語義完全無法成立, 它本身就代表了對物件的獨佔, 只有它自己能操作物件. 如果允許複製, 這就好比把保險箱的鑰匙交給別人.

上面我們已經討論過比較-交換操作, 但是它的本質實際上是讀-改-寫 (read-modify-write, RMW) 操作. 我們首先將原子變數的值取出來儲存到一個臨時變數中, 修改完這個臨時變數之後, 再重新寫入原子變數中. 比較-交換操作 compare_exchange_* 是明顯的讀-改-寫操作. 也許和直覺不太相符, fetch_* 也是讀-改-寫操作, 其內部一般是委託給 compare_exchange_* 來完成的. fetch_* 的本質是

  1. 存取原子變數的值;
  2. 基於原子變數的值進行對應的操作;
  3. 將新值重新寫回原子變數中.

如果不將上面的操作設計為讀-改-寫, 而是設計為簡單的存取-寫入 (load-write) 會怎麼樣呢? 若 x 的值為 1, 兩個執行緒同時存取到 x = 1 這個值, 各自對 x 的值增加一之後重新寫入, 則最終兩個執行緒都可能將 2 寫入原子變數中. 但是我們實際上想要的效果是兩個執行緒各自為 x 的值增加一, 最終得到 x = 3. 因此, fetch_* 必須有比較的動作, 也就是最終設計必須是讀-改-寫. 當然, 並不只有 fetch_* 被設計為讀-改-寫操作, 那些 std::atomic 上的多載運算子也是讀-改-寫.

2.2 std::atomic_flag

C++ 標準樣板程式庫標頭檔 <atomic> 中還存在著一個比較特殊的原子型別 : std::atomic_flag, 它和 std::atomic<bool> 比較類似, 僅具有兩種狀態. 但是相比於 std::atomic<bool>, std::atomic_flag 更加簡單, 而且它保證它內部的實作必定是透過 CPU 無鎖指令實現的, 這點 std::atomic<bool> 並不保證. 從效能角度來說, std::atomic_flag 稍高於 std::atomic<bool>.

std::atomic_flag 的兩種狀態分別為設定 (set) 和清除 (clear), 檢查其狀態的行為被稱為測試 (test). 在 C++ 11 下, std::atomic_flag 必須透過標準樣板程式庫提供的巨集 ATOMIC_FLAG_INIT, 即 std::atomic_flag flag {ATOMIC_FLAG_INIT}; 這種形式. 在 C++ 20 之後, 可以使用 std::atomic_flag flag {}; 這種形式宣告. 初始化後, std::atomic_flag 物件的預設狀態為清除狀態.

std::atomic_flag 的主要成員函式有

  • test_and_set : 檢查狀態是否為清除, 如果是的話更改狀態為設定, 同時回傳原來的狀態. 若原來的狀態為清除, 則回傳 false; 若原來的狀態為設定, 則回傳 true. 這顯然是一個讀-改-寫操作;
  • clear : 將狀態設定為清除;
  • test (since C++ 20) : 用於回傳當前的狀態, 回傳值的意義和 test_and_set 相同.

std::atomic_flag 通常用於實作自旋鎖 :

#include <atomic>

class spin_lock {
private:
    std::atomic_flag flag;
public:
    spin_lock() noexcept : flag {ATOMIC_FLAG_INIT} {}
    ~spin_lock() noexcept = default;
public:
    void lock() noexcept {
        while(this->flag.test_and_set());
    }
    void unlock() noexcept {
        this->flag.clear();
    }
};

2.3 設定記憶體排序

第 1 章中, 我們介紹了六種記憶體排序模型. 其中, 寬鬆次序, 發布-消費次序, 發布-獲取次序和全域一致性次序在 C++ 11 被引入. std::atomic 的成員函式 load, store, exchange, compare_exchange_*fetch_* 都支援自訂記憶體排序, 這些和記憶體排序相關的參數被放到了函式的最後, 並且有預設值.

C++ 目前引入的記憶體排序有 :

  1. std::memory_order_relaxed;
  2. std::memory_order_consume;
  3. std::memory_order_acquire;
  4. std::memory_order_release;
  5. std::memory_order_acq_rel;
  6. std::memory_order_seq_cst.

2.3.1 std::memory_order_relaxed

std::memory_order_relaxed 對應了寬鬆次序, 被其標記的原子變數操作服從寬鬆次序模型, 值在執行緒之間的同步具有不確定性, 對應的操作可以被更自由地重排, 只要重排不改變程式最終的結果.

#include <atomic>
#include <thread>
#include <cassert>

std::atomic<bool> x {}, y {};
std::atomic<int> z {};

void set_x_and_y() {
    x.store(true, std::memory_order_relaxed);
    y.store(true, std::memory_order_relaxed);
}
void read_x_then_y() {
    while(not x.load(std::memory_order_relaxed));
    if(y.load(std::memory_order_relaxed)) {
        ++z;
    }
}
void read_y_then_x() {
    while(not y.load(std::memory_order_relaxed));
    if(x.load(std::memory_order_relaxed)) {
        ++z;
    }
}

int main(int argc, char *argv[]) {
    std::thread t1(set_x_and_y);
    std::thread t2(read_x_then_y), t3(read_y_then_x);
    t1.join();
    t2.join();
    t3.join();
    assert(z not_eq 0);
}

顯然, Code 17 中的斷言可能不成立. 執行緒 t2 進行 if 條件判斷的前提是 t1 寫入的 x 值被同步到了 t2. 此時, y 的值可能還是未同步的狀態, 甚至還有對 y 的寫入還沒開始的可能. 在這種情況下, 若寫入 y 還未開始, t3 肯定還卡在 while 上. 但若 y 的值已經被寫入並且同步給了 t3, 則 t3 進入條件判斷. 儘管 y 的值已經被同步過來了, 但是 x 的值不一定進行了同步, 故 z not_eq 0 的斷言不滿足. 在另一種情況下, set_x_and_yxy 的兩個寫入操作被重排, 導致 y 的寫入先進行. 不過仍然可能因為 xy 的值不能及時同步給執行緒 t2 或者 t3 導致斷言不成立.

2.3.2 std::memory_order_consume

std::memory_order_consume 對應了發布-消費次序中的消費操作, 即之前討論中出現的 CONSUME 區塊, 其必須與 std::memory_order_release, std::memory_order_acq_relstd::memory_order_seq_cst 搭配才能有同步的效果. C++ 對 std::memory_order_consume 的定義為 : 若原子變數 x 的某個操作標記了該記憶體排序, 則在任意一個執行緒內, 在該操作之後的所有和 x 有關的存取或者寫入操作都不能被重排到該操作之前. 例如有操作序列

  1. x.load(std::memory_order_consume);
  2. x.load(std::memory_order_relaxed);
  3. x.store(value, std::memory_order_relaxed).

正在某個執行緒內按順序運作, 在操作 (1) 之後的程式碼不論怎麼重排, 都不可以出現在操作 (1) 之前, 儘管重排可能不改變最終的程式結果. 需要注意的是, 所有和 x 有關的存取和寫入操作並不單獨指原子變數 x, 而是可能依賴 x 的所有操作. 例如 y 依賴了 x 的具體值, 那麼操作 (1) 之後和 y 有關的存取和寫入也不能被重排到操作 (1) 之前.

C++ 要求具有 std::memory_order_consume 記憶體排序操作之後的寫入操作不能被重排到該操作之前比較容易理解, 但是為什麼還要求了存取操作也不能被重排到該操作之前呢, 畢竟存取操作並不修改原子變數的值. 原因在之前我們也曾經討論過, 由於非全域一致性次序並沒有一個固定的全域一致性序列, 故執行緒存取到的序列可能是不同. 例如現在 x = 1y = 1, 程式中 xy 相關. 執行緒 t1 現在以 std::memory_order_consume 存取 x 得到 x = 1, 另一個執行緒在此時寫入了 x = 2y = 2. 接下來, t1 準備以 std::memory_order_consume 次序存取 y. 若此時後面一個對 ystd::memory_order_seq_cst 次序的存取操作被重排到了上面這個操作之前, 接下來不論以什麼次序存取 y, 都會得到 y = 2. 最終執行緒 t1 得到了 x = 1y = 2, 這可能導致錯誤.

2.3.3 std::memory_order_release

由於 std::memory_order_consume 至少要和 std::memory_order_release 搭配使用才能有同步的效果, 故我們先介紹 std::memory_order_release, 也為接下來介紹 std::memory_order_acquire 埋下鋪墊.

std::memory_order_release 對應了發布-消費次序或發布-獲取次序中的發布操作 (釋放操作), 對應了之前討論中的 RELEASE 區塊. C++ 對 std::memory_order_release 的定義為 : 若一個操作是以 std::memory_order_release 進行的, 則要求在執行緒內該操作之前的所有操作都必須已經完成, 不能被重排到該操作之後. 將一個以 std::memory_order_relaxed 為順序的操作重排到發布操作之後, 可能導致其它執行緒想要透過 std::memory_order_consume (或者更加嚴格的記憶體排序) 獲取值的執行緒無法被同步到最新的值. 值得一提的是, std::memory_order_release 要求所有在本操作之前的操作都完成, 並不只約束原子變數的操作, 也同時約束非原子變數的操作 :

#include <atomic>

int *ptr {};
unsigned long address {};
std::atomic<bool> data_ready {};

void read_data(int *p) {
    address = reinterpret_cast<unsigned long>(ptr);
    ptr = p;
    data_ready.store(true, std::memory_order_release);
}

Code 18 中, 單執行緒下將 ptr = p 這一操作重排至 data_ready 的設定之後沒有任何問題, 但在多執行緒下, 其它執行緒若透過 std::memory_order_consume 順序存取 ptr 的值, 重排可能導致另一個執行緒獲取舊值. 但是 Code 18 就不會發生這種情況.

我們來看一個 std::memory_order_releasestd::memory_order_consume 搭配使用的實例 :

#include <atomic>
#include <thread>
#include <cassert>

std::atomic<int> x {}, y {};
bool z {};

void set_x_and_y() {
    x.store(1, std::memory_order_relaxed);
    y.store(1, std::memory_order_release);
}
void set_z() {
    while(y.load(std::memory_order_relaxed) == 0);
    if(x.load(std::memory_order_consume) == 1) {
        z = true;
    }
}

int main(int argc, char *argv[]) {
    std::thread t1(set_x_and_y), t2(set_z);
    t1.join();
    t2.join();
    assert(z);
}

Code 19 中的斷言是可能不成立的, 因為在函式 set_x_and_y 中, xy 沒有任何關係. 在執行緒 t2 中, 我們透過與 t1 的發布-消費次序獲取到了 y 的最新值為 1. 但儘管我們對 x 的存取也使用了消費次序, 但是由於 xy 沒有任何關係, 所以執行緒 t1x 寫入的最新值不一定能被同步到執行緒 t2, 導致此時 t2 獲取到 x 的值為零, 斷言不成立.

2.3.4 std::memory_order_acquire

std::memory_order_acquire 對應了發布-獲取次序中的獲取操作, 即之前討論中的 ACQUIRE 區塊. 和 std::memory_order_consume 類似, std::memory_order_acquire 也必須與 std::memory_order_release, std::memory_order_acq_rel 或者 std::memory_order_seq_cst 其中之一搭配使用才能形成同步效果. C++ 對 std::memory_order_acquire 的嚴格規定是 : 在採用 std::memory_order_acquire 的操作下, 後面所有操作都不能被重排至該操作之前. 相比於 std::memory_order_consume, std::memory_order_acquire 更加嚴格, 畢竟 std::memory_order_consume 值要求與本操作對應原子變數及其相關的變數操作不能被重排到本操作之前.

我們在討論發布-獲取次序的時候已經知道, 當我們使用 std::memory_order_acquire 去同步 std::memory_order_release 發布的資料時, 該同步批次中的所有資料都會同步過來, 不論這個操作對應的記憶體排序是 std::memory_order_relaxed, 甚至這些資料都可以不需要是原子變數. 基於此, 若要讓 Code 19 中的斷言必定成立, 只需要將 y 的存取操作記憶體排序從 std::memory_order_consume 改為 std::memory_order_acquire 即可. 進一步地, 即然 y 同步時, 批次中已經可以取到 x 的最新值, 那麼接下來對 x 的存取操作即便是退化到了 std::memory_order_relaxed, 也仍然可能取到該批次中的最新值. 至少在修改後的 Code 19 中, 不會導致斷言不成立. 這種同步的傳遞性在變數依賴關係的複雜性提高的情況下仍然成立 :

#include <atomic>
#include <thread>
#include <cassert>

std::atomic<int> x {}, y {}, z {};
std::atomic<bool> sync1 {}, sync2 {};

void set_xyz() {
    x.store(1, std::memory_order_relaxed);
    y.store(-1, std::memory_order_relaxed);
    z.store(42, std::memory_order_relaxed);
    sync1.store(true, std::memory_order_release);
}
void set_sync2() {
    while(not sync1.load(std::memory_order_acquire));
    sync2.store(true, std::memory_order_release);
}
void read_xyz() {
    while(not sync2.load(std::memory_order_acquire));
    assert(x == 1);
    assert(y == -1);
    assert(z == 42);
}

int main(int argc, char *argv[]) {
    std::thread t1(set_xyz), t2(set_sync2), t3(read_xyz);
    t1.join();
    t2.join();
    t3.join();
}

Code 20 函式 read_xyz 中, 三個 assert 永遠都成立.

2.3.5 std::memory_order_seq_cst

std::memory_order_seq_cst 對應了全域一致性次序, C++ 對其的定義為 : std::memory_order_seq_cst 同時具有 std::memory_order_releasestd::memory_order_acquire 的效果; 同時, 要求所有執行緒的運作序列都必須是某一程式啟動之後固定 (但不一定唯一) 全域操作序列的子集. 也就是說, 若某一操作以 std::memory_order_seq_cst 進行, 那麼在這個操作之前的所有存取與寫入操作都不能被重排到該操作之後, 在該操作之後的所有存取與寫入操作都不能被重排至該操作之前. 由於全域一致性次序模型並不是完全不允許重排, 因此在以 std::memory_order_seq_cst 為記憶體排序的操作之前或者之後, 那些操作都可能被重排, 但是重排唯獨不能越過操作本身, 也就是只能在本操作之前或者之後進行.

第 2.1 節第 2.2 節中, 我們使用 std::atomic 時並未給出記憶體排序, 這是因為所有 std::atomic 的成員函式為了最大程度上保障使用者程式的正確性, 所有和記憶體排序有關的參數都將 std::memory_order_seq_cst 作為預設引數. 在一般情況下, 如果不是效能極其敏感的場景下, 我們都推薦不改變預設的記憶體排序, 因為由記憶體排序導致的程式錯誤偵錯難度非常高.

我們不推薦在同一程式中混用多種不同的記憶體排序模型, 但有一種情況值得我們分析 :

#include <atomic>
#include <thread>
#include <cassert>

std::atomic<int> x {}, y {};

void write_x_and_y() {
    x.store(1, std::memory_order_seq_cst);      // (1)
    y.store(-1, std::memory_order_relaxed);     // (2)
}
void read_x_then_y() {
    while(not x.load(std::memory_order_seq_cst) not_eq 1);      // (3)
    assert(y.load(std::memory_order_seq_cst) < 0);      // (4)
}

int main(int argc, char *argv[]) {
    std::thread t1(write_x_and_y), t2(read_x_then_y);
    t1.join();
    t2.join();
}

Code 21 在不同的 CPU 下行為會變得無法預料, 我們無法肯定斷言是否滿足. 退一萬步講, 打破 std::memory_order_seq_cst 的定義, 若在實際運作中, 操作 (2) 被重排到了 (1) 之前, 按道理執行緒 t2 應該能夠存取到 y 的最新值 -1, 斷言必然成立. 然而如果這個重排是由 CPU 進行的 (編碼器一般會嚴格遵守記憶體排序), 並且 CPU 架構為 AArch64, 那麼斷言可能並不成立. 這也無法責怪 CPU, 因為 y 的寫入是以 std::memory_order_relaxed 進行的, 所以只要 CPU 不將 y 的最新值同步出去, 只是自己持有, 也沒問題. 因此, 除非某些變數僅用於計數或者完全沒必要將最新值同步給其它執行緒, 否則我們應該盡可能讓程式只採用一種記憶體排序模型.

Code 21 中的全域一致性操作序列應該如何分析? 只有使用 std::memory_order_seq_cst 的操作才會加入到全域一致性序列當中. 在 Code 21 各個函式中, 我們可以得到

  • (1) 先於 (2) 發生;
  • (3) 先於 (4) 發生.

(3) 先於 (1) 發生, 由於 x 的值還未改變, 故迴圈會反覆進行. 不妨我們假定寫入操作先發生, 故 Code 21 有且唯有一種全域一致性操作序列 : (1)->(3)->(4). 這裡有一個隱含的場景 : (1) 先於 (4) 發生. 為 Code 21 分析全域一致性操作序列沒什麼用, 因為只有一種可能. 但是當我們將 (2) 中的寫入操作記憶體排序更換為 std::memory_order_seq_cst, 情況便不同了. 我們仍然假定無用的 (3) 一定不會先發生, 即 (1) 先於 (3) 發生. 此時, 由於 (3) 先於 (4) 發生, 所以 (1) 也先於 (4) 發生. 故必然存在 (1) -> ... -> (3) -> ... -> (4) -> ... 這樣的操作序列. 將每個 ... 都替換成 (2), 我們便得到了三個合法的全域一致性序列 :

  1. (1) -> (2) -> (3) -> (4);
  2. (1) -> (3) -> (2) -> (4);
  3. (1) -> (3) -> (4) -> (2).

一個非法的全域一致性操作序列中至少存在一個環, 例如 (4) \leftrightharpoons (1) -> (2) -> (3) (事實上最後還有 (4) \leftrightharpoons (3)), 這是一個判斷全域一致性序列合法性的重要方法. 因為根據序列 (4) -> (1) -> (2) -> (3), 本身就假定了 (4) 先於 (1) 發生, 這和我們推導得到的 (1) 先於 (4) 結合之後, 這裡便有了一個環. 像 herd7, CppMem 和 CDSCheck 等工具都是透過列舉可能的全域一致性操作序列來檢查多執行緒程式是否合法.

2.3.6 std::memory_order_acq_rel

std::memory_orer_acq_relstd::memory_order_seq_cst 的弱化版, 它同樣是 std::memory_order_releasestd::memory_order_acquire 的結合體, 它只是沒有全域一致性序列的要求. 然而, 在 std::atomic 的成員函式 loadstore 中僅需要其中一種即可, 故 std::memory_order_acq_rel 專門供給讀-改-寫操作使用. 讀-改-寫操作中的讀使用 std::memory_order_acq_rel 中的獲取部分 (std::memory_order_acquire), 寫部分使用 std::memory_order_acq_rel 中的發布部分 (std::memory_order_release). 不過, 這是讀-改-寫成功的情況; 若在比較的時候發現已經不相等, 讀部分也需要一個記憶體排序. 對於 exchange 或者 fetch_* 來說, 讀-改-寫失敗的情況下, 讀使用的記憶體排序和成功時的記憶體排序相同; 對於 compare_exchange_* 來說, 它共有四個參數, 後面兩個和記憶體排序有關的參數, 第一個是用於成功的場景, 第二個用於失敗後僅讀的場景.

若將 std::memory_order_acq_rel 使用在 load 或者 store 上, 那麼記憶體排序將會退化為 std::memory_order_acquire 或者 std::memory_order_release. 因為對於 load 或者 store 來說, 並不需要另一部分排序.

2.4 比較-交換操作的同步

現在我們嘗試使用比較-交換操作結合記憶體排序, 來實作一個連結串列式堆疊 (參考《【資料結構】堆疊》) 的插入 :

#include <atomic>

struct node {
    node *next;
    int value;
};
std::atomic<node *> head {};
void push(int value) {
    auto new_node {new node {head.load(std::memory_order_relaxed), value}};
    while(not head.compare_exchange_weak(new_node->next, new_node,
            std::memory_order_release, std::memory_order_relaxed));
}

Code 22 中, 對 head 存取操作統一使用了寬鬆次序, 不論是初始化 new_node 還是更新 new_node->next 的值. 這是因為在真正的讀-改-寫開始前, head 可能隨時被其它執行緒改變. 即使我們在 new_node 的初始化中使用了 std::memory_order_acquire 甚至 std::memory_order_seq_cst 存取到了最新的值, 但 new_node 初始化完成後到 while 迴圈開始運作之前, 這中間還存在一段空隙. 此時, 其它執行緒可能改變 head. 因此, 我們要求 new_node->next 的指派每次都同步 head 的最新值沒有太大意義, 放鬆記憶體排序反而可能帶來效能的提升.

仔細觀察 Code 22, 我們可以發現, 比較-交換操作的讀-改-寫成功情形下, 只使用了 std::memory_order_release, 並沒有使用 std::memory_order_acq_rel. 這可能會造成一種情況 : 讀-改-寫操作中的讀操作並未存取到最新的 head, 它得到了一個老的 head, 這個老的 head 已經位於最新 head 之後 :

Figure 1-1. 連結串列式堆疊

此時, pushnew_node 指派給 head 後變成了

Figure 1-2. 插入一個新結點

我們發現 head 徹底消失了, 記憶體流失發生了! 但事實上這種情況不會發生, C++ 標準規定了每一個讀-改-寫操作都必須進行一次強制同步, 即使某個寫入操作是以 std::memory_order_relaxed 順序進行的, 只要它是最後一個寫入的, 讀-改-寫操作就必須看到其值. 換句話說, 讀-改-寫操作每次都可以看到最新的值. 因此, Code 22 中的 head.compare_exchange_weak 中成功情況僅使用 std::memory_order_release 並沒有什麼問題.

現在讓我們重新考慮 Code 16 中的 spin_lock. 由於讀-改-寫操作會強制同步最新的值, 我們似乎有理由讓 spin_lock 使用的記憶體排序更加寬鬆 :

#include <atomic>

class spin_lock {
private:
    std::atomic_flag flag;
public:
    spin_lock() noexcept : flag {ATOMIC_FLAG_INIT} {}
    spin_lock(const spin_lock &) = delete;
    spin_lock(spin_lock &&) = delete;
    ~spin_lock() noexcept = default;
public:
    spin_lock &operator=(const spin_lock &) = delete;
    spin_lock &operator=(spin_lock &&) = delete;
public:
    void lock() noexcept {
        while(this->flag.test_and_set(std::memory_order_relaxed));
    }
    void unlock() noexcept {
        this->flag.clear(std::memory_order_relaxed);
    }
};

我們知道, std::atomic_flag::test_and_set 是一個讀-改-寫操作, 因此按照 C++ 標準的規定, 清除操作也並不需要很嚴格的記憶體順序, 反正 lock 中的讀-改-寫操作必定能存取到最新的值. 遺憾的是, Code 23 中的 spin_lock 是錯誤的. spin_lock 作為一個基礎組件, 還有一個隱含的職責 : 它不僅要負責自己成員函式的同步, 還需要發布本次鎖住的臨界區所寫入的所有值, 無論這些寫入操作使用什麼記憶體順序. 在 Code 23 下, 若 spin_lock 鎖住的臨界區全部使用 std::memory_order_relaxed, 在鎖被釋放之後, 下一個持有 spin_lock 的執行緒可能無法看到上一個執行緒所寫入的值, 這可能導致錯誤. 要想避免這種情況, 就需要使用發布-獲取次序, 我們應該對 unlock 使用發布次序, 對 lock 使用獲取次序, 即對 unlock 中的 clear 使用 std::memory_order_release, 對 lock 中的 test_and_set 使用 std::memory_order_acquire.

綜上所述, 我們在考慮讀-改-寫操作的記憶體順序時要尤其小心. 我們不能只考慮到變數本身的同步, 還要考慮我們所寫下的程式碼意味著什麼, 是否還有其它變數的值需要攜帶著一起發布出去的.

最後相信大家都能猜到為什麼 C++ 標準樣板程式庫為每一個多載運算子都提供了一個相同功能的成員函式, 因為多載運算子由於參數的限制, 無法再額外指定記憶體排序, 所以多載運算子使用的都是最嚴格的全域一致性次序.

2.5 記憶體屏障

若某個程式庫提供了一個 relaxed_atomic 類別, 它和 std::atomic 的區別是其變數的操作都服從寬鬆次序, 且不接受自訂記憶體排序. 這種特殊的原子變數可能專門用於非常特殊的場景或者某個 CPU 架構下. 若我們想讓 relaxed_atomic 強制服從某種記憶體排序, 可以使用 std::atomic_thread_fence 建立記憶體屏障 (memory barrier) :

#include <atomic>
#include "relaxed_atomic.hpp"

relaxed_atomic<int> value {};

void set(int v) {
    value = v;
    std::atomic_thread_fence(std::memory_order_release);        // 發布這一次操作
}
int read() {
    std::atomic_thread_fence(std::memory_order_acquire);        // 和 set 中的發布形成同步, 是發布-獲取次序
    return value;
}

relaxed_atomic 除了記憶體順序, 其它方面和 std::atomic 沒有任何不同, 那麼 Code 24-1 相當於

#include <atomic>

std::atomic<int> value {};

void set(int v) {
    value.store(v, std::memory_order_release);
}
int read() {
    return value.load(std::memory_order_acquire);
}

我們在討論發布-獲取次序的時候曾經提到過, 發布操作不僅僅針對原子變數生效, 同時也針對本次寫入的非原子變數生效, 這點在使用 std::atomic_thread_fence 時仍然生效. 不過, std::memory_order_consume 天生和 std::atomic_thread_fence 合不來. 因為 std::atomic_thread_fence 不帶有任何值, 而 std::memory_order_consume 需要分析值的依賴關係.

3. 擺脫鎖的束縛

到目前為止, 我們還沒有見過互斥鎖出現, 這是原子變數的威力之一 : 我們可以在無需借助互斥鎖的幫助, 安全地在多執行緒之間同步資料, 這被稱為無鎖程式設計. 使用互斥鎖的併發程式碼是相對直觀的, 臨界區的資料要麼被鎖住, 要麼無人使用. 我們已經知道, 若加鎖的方式不當, 會給程式帶來死鎖的風險, 這種缺陷在無鎖程式設計上並不存在. 然而無鎖程式設計的門檻非常高, 我們寫下的每一行都必須經過仔細斟酌, 被人稱為黑魔法的樣板超編程在無鎖程式設計面前簡直是小兒科.

使用者在使用一個無鎖資料結構的時候, 無需考慮資料競爭問題, 這是資料結構設計者要考慮的問題. 相比起需要主動加鎖的阻塞式資料結構, 無鎖資料結構的使用非常方便, 基本上和單執行緒下使用容器差不多. 在高併發下, 無鎖資料結構不會因為等待鎖或者因為上下文切換而浪費時間, 響應性極高. 綜合以上特點, 無鎖程式設計看起來相當吸引人, 然而實作一個正確的無鎖資料結構卻非常困難.

3.1 無鎖堆疊

我們從最簡單的資料結構堆疊開始嘗試, 我們並不打算在設計的時候就考慮記憶體排序, 因為它並不影響資料結構的設計, 只影響運作效能, 我們直接使用最嚴格的全域一致性次序, 即保持 std::atomic 操作上的記憶體排序使用預設引數. 當我們確認了無鎖堆疊的正確性後, 我們再開始考慮記憶體順序, 這樣我們最開始就不需要擔心因為記憶體順序導致無鎖堆疊出錯了. 另外, 在最開始我們也不考慮使用樣板支援多型別, 只考慮 int 這種簡單的型別, 避免例外安全問題給我們帶來的困擾.

有兩個合適的資料結構適合作為堆疊的底層資料結構, 一個是陣列 (參考《【資料結構】陣列》), 另一個是前向連結串列. 我們並不打算使用陣列作為無鎖堆疊的底層資料結構, 因為那樣每一個元素都需要原子化, 而堆疊每次保證頂部元素原子化即可. 使用前向連結串列還有一個好處, 就是無需面對陣列擴容這種特殊場景. 在多執行緒下, 原子化擴容可能十分消耗效能, 也可能讓記憶體的峰值升高.

使用前向連結串列的無鎖堆疊僅需要將頭部結點原子化 :

#include <atomic>

struct node {
    node *next;
    int value;
};

class lock_free_stack {
private:
    std::atomic<node *> head {};
public:
    lock_free_stack() noexcept = default;
    lock_free_stack(const lock_free_stack &) = delete;
    lock_free_stack(lock_free_stack &&) = delete;
    ~lock_free_stack() noexcept {
        int _;
        while(this->pop(_));
    }
public:
    lock_free_stack &operator=(const lock_free_stack &) = delete;
    lock_free_stack &operator=(lock_free_stack &&) = delete;
public:
    void push(int);
    bool pop(int &) noexcept;
};

Code 25 中, 我們搭建了一個無鎖堆疊的架子, 它不支援被複製或者移動. 事實上, 在絕大多數無鎖資料結構的應用場景中, 複製或者移動這種需求並不存在. 想像一個交易系統, 我們只需要全域有一個生產者消費者佇列即可, 生產者向佇列插入新的交易, 消費者從佇列中取出新的交易進行處理, 複製這個佇列沒什麼意義. 在解構子中, 我們委託成員函式 pop 將所有剩餘的元素移除. 成員函式 popstd::stack::pop 不太相同, 它接受一個參考, 往外回傳一個移除成功的值, 回傳值用於標記移除是否成功. 在佇列為空的情況, 成員函式 pop 不改變引數的狀態, 直接回傳 false.

成員函式 push 無需考慮記憶體的回收, 我們先實作它. 若多個執行緒同時呼叫 push 或者 pop, 我們需要考慮當前執行緒所看到的頭部結點可能因為其它執行緒的 push 或者 pop 而改變. 因此, 當新配置的結點還未正式成為頭部結點時, 我們必須透過不斷比較來確保當前執行緒看到的頭部結點是最新的, 把最新的結點作為新配置結點的 next. 故我們使用比較-交換操作 :

void lock_free_stack::push(int value) {
    const auto new_node {new node {this->head.load(), value}};
    while(not this->head.compare_exchange_weak(new_node->next, new_node));
}

Code 26 中, 我們在結點配置完成之後, 比較 new_node->nextthis->head 是否相等. 若相等, 則將 new_node 指派給 this->head; 否則, 將 this->head 的最新值指派給 new_node->next, 然後進行下一次比較-交換操作.

類似地, 我們使用比較-交換操作來實作 pop :

bool pop(int &result) noexcept {
    auto node {this->head.load()};
    if(not node) {
        return false;
    }
    while(not this->head.compare_exchange_weak(node->next, node));
    result = node->value;
    delete node;
    return true;
}

Code 27 看起來非常完美. 我們在判斷了結點的有效性之後, 利用比較-交換操作獲取到了最新的結點. 若現在有兩個執行緒共同 pop, 它們拿到了同一個頭部結點. 其中一個執行緒順利地移除了改結點並回收了對應的記憶體, 另一個執行緒正準備比較-交換操作, 此時當這個執行緒存取 node->next 時, 由於 node 已經被第一個執行緒回收, 所以發生了未定行為. 這個問題產生的本質是執行緒並不知道是否還有其它執行緒還在使用結點.

3.1.1 基於 pop 計數的無鎖堆疊

Code 27 中的問題也不是無解的, 有一種方案是將所有將要回收的結點收集起來, 將這些結點的回收任務交給最後一個完成 pop 的執行緒. 為此, 我們必須對 pop 進行計數, 每當有一個執行緒呼叫 pop, 這個計數便加一; 每當一個執行緒完成了 pop, 這個計數減一. 當最後一個執行緒發現計數為一且該執行緒即將退出, 就說明沒有其它執行緒持有待回收列表中的任何結點, 將待回收列表中的所有結點回收即可. 待回收結點也可以使用一個前向連結串列集合到一起 :

#include <atomic>

struct node {
    node *next;
    int value;
};

class lock_free_stack {
private:
    std::atomic<node *> head {};
    std::atomic<std::size_t> pop_count {};
    std::atomic<node *> deprecated_nodes {};
private:
    void deprecate_node_list(node *first, node *last) noexcept {
        last->next = this->deprecated_nodes.load();
        while(not this->deprecated_nodes.compare_exchange_weak(last->next, first));
    }
    template <bool NullNode>
    void try_reclaim(node *old_head) noexcept {
        if(this->pop_count.fetch_sub(1) == 1) {
            auto deprecated_nodes {this->deprecated_nodes.exchange(nullptr)};
            if(this->pop_count.load() == 0) {
                while(deprecated_nodes) {
                    const auto backup {deprecated_nodes};
                    deprecated_nodes = deprecated_nodes->next;
                    delete backup;
                }
            }else if(deprecated_nodes) {
                auto last_node {deprecated_nodes};
                for(; last_node->next; last_node = last_node->next);
                this->deprecate_node_list(deprecated_nodes, last_node);
            }
            if constexpr(not NullNode) {
                delete old_head;
            }
        }else {
            if constexpr(not NullNode) {
                this->deprecate_node_list(old_head, old_head);
            }
        }
    }
public:
    lock_free_stack() noexcept = default;
    lock_free_stack(const lock_free_stack &) = delete;
    lock_free_stack(lock_free_stack &&) = delete;
    ~lock_free_stack() noexcept {
        int _;
        while(this->pop(_));
    }
public:
    lock_free_stack &operator=(const lock_free_stack &) = delete;
    lock_free_stack &operator=(lock_free_stack &&) = delete;
public:
    void push(int value) {
        const auto new_node {new node {this->head.load(), value}};
        while(not this->head.compare_exchange_weak(new_node->next, new_node));
    }
    bool pop(int &value) noexcept {
        this->pop_count.fetch_add(1);
        auto node {this->head.load()};
        while(node and not this->head.compare_exchange_weak(node, node->next));
        bool result {};
        if(node) {
            value = node->value;
            result = true;
            this->try_reclaim<false>(node);
        }else {
            this->try_reclaim<true>(nullptr);
        }
        return result;
    }
};

Code 28 變得相當複雜, 我們逐行解析. 在進入成員函式 pop 後首先對 this->pop_count 增加一, 告訴其它執行緒目前還存在執行緒正在進行 pop 操作. 接下來存取最新的頭部結點, 正式進入移除流程. 首先我們要確保 node 不為空, 比較-交換操作也可能會更新一個空結點出來. 當有兩個執行緒共同 pop 的時候, 另一個執行緒可能已經將 this->deprecated_nodes 清空了 :


時刻 執行緒 T1 執行緒 T2
t1 呼叫成員函式 try_reclaim, 並判斷得到的 this->pop_count 是否為 1 呼叫成員函式 pop, 但是還沒有給 this->pop_count 增加計數
t2 暫停 增加 this->pop_count 的計數後存取最新的 this->head, 此時獲得的頭部結點不為空
t3 透過比較-交換操作將 this->head 設定為空指標 暫停
t4 \vdots 進行比較-交換操作發現剛剛儲存下來的 node 已經改變, 更新 nodenullptr (假設此時沒有新的頭部結點插入進來)

node 有效, 則設定負責傳出的引數和函式回傳值, 最後呼叫成員函式 try_reclaim<false>; 否則, 設定函式回傳值後呼叫成員函式 try_reclaim<true>. 成員函式樣板 try_reclaim 中的樣板參數用於標記其參數 old_head 是否接收了一個空指標, 不然的話我們還要再 try_reclaim 中再次判斷函式引數是否為空指標, 這個信息外面已經知道了.

成員函式 try_reclaim 真正負責回收遺棄的結點. 遺棄結點被回收的前提條件是, 當前有且唯有一個執行緒正在進行 pop, 因此在函式一開始, 我們除了讓 this->pop_count 的值減一之外還判斷了其回傳值. 若 this->pop_count 的原值為 1, 說明 old_head 由本執行緒獨佔, 當其不為空時, 我們可以直接回收; 當 this->pop_count 的原值大於 1, 說明還存在其它執行緒正在進行 pop 操作, old_head 不一定是獨佔的. 此時若 old_head 不為空, 我們必須將其放入 this->deprecated_nodes 中由其它執行緒負責回收, 避免還存在執行緒正在使用相同的結點. 成員函式 deprecate_node_list 接收一個前向連結串列的頭部結點和尾部結點, 尾部結點用於連結 this->deprecated_nodes, 頭部結點將會成為新的 this->deprecated_nodes. 在確保只有本執行緒在這個時刻正在進行 pop 後, 我們透過比較-交換操作將準備回收的所有結點獨佔, 這裡若使用 load 存取, 則後面還要再進行比較-交換操作以避免其它執行緒在 if 之後, 在比較-交換操作之前已經更改了 this->deprecated_nodes. 這樣是不必要的, 不如直接使用交換操作一步到位. 但若此時, 有執行緒更改了 this->deprecated_nodes 會有影響嗎? 這種情況發生在本執行緒運作完 this->pop_count.fetch_sub(1) 之後, 又有兩個執行緒同時進行 pop, 其中有一個執行緒發現 this->pop_count 大於 1, 於是將待回收的結點放入待回收列表中. 這個新放入的結點並不影響. 若本執行緒取得了這個新加入的結點, 那麼回收可能由本執行緒進行. 接下來我們再次判斷 this->pop_count 的值就是為了防止在待回收列表被我們獨佔之後, 還有其它執行緒開始 pop 的情況. 若 this->pop_count 還是為零, 則回收待回收列表中的所有結點 (也包含剛才執行緒新加入的結點); 若不為零, 則將持有的所有待回收結點和 this->deprecated_nodes 連結起來. 此時, 待回收結點中靠前的結點可能還存在執行緒正在使用.


時刻 執行緒 T1 執行緒 T2 執行緒 T3
t1 this->pop_count.fetch_sub(1)
t2 進入成員函式 try_reclaim this->pop_count.fetch_add(1)
t3 放入剛剛移除的結點到 this->deprecated_nodes
t4 取出並更新 this->deprecated_nodes

在時刻 t4, 執行緒 T3 還持有著一個結點, 故執行緒 T1 不能直接進行回收. 有由於中間還可能存在其它執行緒更新 this->deprecated_nodes, 所以必須找到最後一個結點和最新的 this->deprecated_nodes 相連結. 最後, 若 old_head 為空, 無需做任何事情.

Code 28 的實作必須非常仔細, 以避免記憶體流失或者未定行為的發生. 儘管有時候實作沒有任何問題, 但仍可能導致一些潛在的異常行為 :

  • if(this->pop_count_fetch_sub(1) == 1) 和後面的 if(this->pop_count.load() == 1) 相互調換, 程式沒有任何問題, 但可能導致某些時刻的記憶體峰值升高. 想像這樣的場景 : 執行緒 T1 在取得 this->deprecated_nodes 之後還未進行 this->pop_count.fetch_sub(1) 的操作. 執行緒 T2 發現還存在其它執行緒正在運作 pop, 便將自己持有的待回收結點放入 this->deprecated_nodes 中. 但此時, 執行緒 T2 鎖放入的結點可能並不存在其它執行緒使用, 執行緒 T1 明確不使用它, 所以這個新加入的結點在這種情況下其實可以安全回收. 這種情況在 Code 28 中不存在.
  • 在執行緒 T1 取得所有待回收結點後發現 this->pop_count 不為零, 執行緒選擇直接將 old_head 放入待回收列表中而不是直接回收, 程式的實作也沒有問題. 但若短時間內有執行緒反覆插入且有兩個以上的執行緒正在反覆進行 pop, 可能導致記憶體大幅上升. 在 this->pop_count.fetch_sub(1) == 1 判斷成立後, 便可以確定 old_head 必定由本執行緒獨佔. 由於其不存在於待回收列表中, 即使其它執行緒在這後進行 pop 函式, 也不可能取得執行緒 T1 所持有的 old_head. 所以只要 this->pop_count.fetch_sub(1) == 1 成立, old_head 便可以由執行緒 T1 在接下來任何時刻安全地回收, 將其放入待回收列表中沒有任何必要. 此時, 若 this->pop_count 一直大於零, 則沒有任何執行緒可以成功回收任意一個結點. 這一本應該被立即回收的結點便被一直留了下來, 與此同時新的待回收結點又一直加入, 記憶體峰值自然會上升.

然而, Code 28 並不完美 :

  • 首先它只適用於 int 型別. 若要將其改為樣板以接受任意型別, 我們還要考慮例外安全問題. 有兩個地方可能會產生例外安全問題 : 在 push 中的結點配置和將引數 value 指派給結點儲存; 在 pop 中將結點內部持有的值指派給負責傳出的引數, 這個動作是複製或者移動. push 中的操作我們無需擔心, 因為 new 中一旦產生了例外情況, 不會導致堆疊狀態發生改變, 也不會發生記憶體流失. 在 pop 中, 若出現例外情況, 取出的結點比如被重新放入堆疊中, 即
    try {
        value = node->value;
    }catch(...) {
        while(not this->head.compare_exchange_weak(old_head->next, old_head));
        throw;
    }
    但若我們對堆疊中的順序有一定依賴, 這種重新放入的方法便不成立. 而且此時成員函式 pop 也無法被 noexcept 所標識. 業界針對無鎖資料結構的通用做法是將值轉化為 std::share_ptr 物件的形式來儲存, 即
    #include 
    template 
    struct node {
        node *next;
        std::shared_ptr value;
    };
    在這種情況下, 成員函式 pop 可以直接回傳一個 std::shared_ptr<T> 物件. 若 pop 沒有成功, 可以回傳一個空指標表示失敗. 由於 std::shared_ptr 的複製或者交換操作都保證不擲出任何例外情況, 所以成員函式 pop 在上述結點結構下仍然可以被 noexcept 所標識.
  • 若存在執行緒反覆插入新元素, 又有大量執行緒反覆不間斷地進行 pop, 待回收列表中的結點數量就會越來越多. 由於 this->pop_count 始終無法為零, 記憶體在這種情況下快速上漲. 這種特殊意義的記憶體流失在 Code 28 中無法解決, 我們必須尋找基於待回收列表更完善的方案或者尋找其它替代方案.

3.1.2 基於風險指標的無鎖堆疊

Code 28 最大的問題是執行緒只知道待回收列表中的結點在 this->pop_count 為零的時候可以安全回收, 當 this->pop_count 不為零的時候, 事實上待回收列表中可能只有少數靠前的結點還被其它執行緒所持有, 然而執行緒卻無法知道哪些結點的回收是完全安全的. 於是, 我們可以將那些執行緒正在使用的結點標記為風險指標, 這樣, 未被標記的結點自然就可以被安全回收. 我們在此對 Code 28 所使用的原始待回收列表方案進行改進.

若一個執行緒正在使用一個結點, 而另一個執行緒正在準備回收這個結點, 那麼這個結點便成了一個風險指標. 此時, 我們需要讓正在使用結點的執行緒通知另一個準備回收結點的執行緒 : 你正在回收的結點是風險指標, 需要延後回收. 若執行緒使用完畢, 則風險指標不再有風險, 此時可以安全地回收結點.

一個執行緒若要回收待回收列表中的結點, 必須檢查其中每一個指標是否為風險指標, 即是否正在被其它執行緒使用. 若一個結點不是風險指標, 那麼回收即可; 否則, 這個結點仍然要留在待回收列表中, 等待下一次回收檢測.

要想一個結點成為風險指標, 只有一種可能 : 某個或者某幾個執行緒透過存取 this->head (不論是透過 load 還是比較-交換操作) 得到了一個結點, 那麼這個結點立即成為風險指標. 當然, 這樣的風險只在 pop 中存在. 只要堆疊不為空, 一個進入成員函式 pop 的執行緒, 到 pop 運作結束這段時間, 必然有一段時間持有一個風險指標. 為此, 我們可以為每一個正在進行 pop 的執行緒設定一個風險指標儲存區, 它是公有的, 可以被其它執行緒所看見.

#include <memory>
#include <atomic>
#include <thread>

#ifndef MAX_HAZARD_POINTER_NUMBER
#define MAX_HAZARD_POINTER_NUMBER 256
#endif

template <typename T>
struct node {
    node *next;
    std::shared_ptr<T> value;
};
struct hazard_pointer {
    std::atomic<void *> hazard_node;
    std::atomic<std::thread::id> tid;
} hazard_pointers[MAX_HAZARD_POINTER_NUMBER];
class hazard_pointer_owner {
private:
    hazard_pointer *hp;
public:
    hazard_pointer_owner() : hp {} {
        for(auto &hp : hazard_pointers) {
            std::thread::id tid {};
            if(hp.tid.compare_exchange_strong(tid, std::this_thread::get_id())) {
                this->hp = &hp;
                break;
            }
        }
        if(not this->hp) {
            throw std::runtime_error("No hazard pointers available!");
        }
    }
    hazard_pointer_owner(const hazard_pointer_owner &) = delete;
    hazard_pointer_owner(hazard_pointer_owner &&) = delete;
    ~hazard_pointer_owner() noexcept {
        this->hp->hazard_node.store(nullptr);
        this->hp->tid.store({});
    }
public:
    hazard_pointer_owner &operator=(const hazard_pointer_owner &) = delete;
    hazard_pointer_owner &operator=(hazard_pointer_owner &&) = delete;
    operator hazard_pointer *() noexcept {
        return this->hp;
    }
public:
    static std::atomic<void *> &get_for_this_thread() noexcept {
        thread_local static hazard_pointer_owner owner {};
        return owner.hp->hazard_node;
    }
};

template <typename T>
struct deprecated_node {
    deprecated_node *next;
    node<T> *hazard_node;
};
template <typename T>
class lock_free_stack {
private:
    std::atomic<node<T> *> head {};
    std::atomic<deprecated_node<T> *> deprecated_nodes {};
public:
    lock_free_stack() noexcept = default;
    lock_free_stack(const lock_free_stack &) = delete;
    lock_free_stack(lock_free_stack &&) = delete;
    ~lock_free_stack() noexcept {
        while(this->pop());
    }
public:
    lock_free_stack &operator=(const lock_free_stack &) = delete;
    lock_free_stack &operator=(lock_free_stack &&) = delete;
public:
    void push(const T &value) {
        const auto new_node {new node<T> {this->head.load(), std::make_shared<T>(value)}};
        while(not this->head.compare_exchange_weak(new_node->next, new_node));
    }
    std::shared_ptr<T> pop() {
        auto &hazard_pointer_for_this_thread {hazard_pointer_owner::get_for_this_thread()};
        // start block (1)
        auto old_head {this->head.load()};
        do {
            node<T> *temp {};
            do {
                temp = old_head;
                hazard_pointer_for_this_thread.store(old_head);
                old_head = this->head.load();
            }while(temp not_eq old_head);
        }while(old_head and not this->head.compare_exchange_strong(old_head, old_head->next));
        hazard_pointer_for_this_thread.store(nullptr);
        // end block (1)
        const auto can_delete_node {[this](node<T> *old_head) -> bool {
            for(auto &hazard_pointer : hazard_pointers) {
                if(hazard_pointer.hazard_node.load() == old_head) {
                    return false;
                }
            }
            delete old_head;
            return true;
        }};
        std::shared_ptr<T> result {};
        if(old_head) {
            result.swap(old_head->value);
            if(not can_delete_node(old_head)) {
                auto new_deprecated_node {new deprecated_node<T> {this->deprecated_nodes.load(), old_head}};
                while(not this->deprecated_nodes.compare_exchange_weak(new_deprecated_node->next,
                        new_deprecated_node));
            }
        }
        auto deprecated_nodes {this->deprecated_nodes.exchange(nullptr)};
        while(deprecated_nodes) {
            const auto backup {deprecated_nodes};
            deprecated_nodes = deprecated_nodes->next;
            if(can_delete_node(backup->hazard_node)) {
                delete backup;
            }else {
                backup->next = this->deprecated_nodes.load();
                while(not this->deprecated_nodes.compare_exchange_weak(backup->next, backup));
            }
        }
        return result;
    }
};

程式預設至多有 256 個風險指標 (當然數量可以自訂). 當我們嘗試獲得為當前執行緒申請風險指標的時候, hazard_pointer 的建構子會尋訪 hazard_pointers 陣列中所有成員變數 id 為空的元素, 將執行緒自己的 ID 透過比較-交換指派給該元素, 這樣就將這個元素獨佔. 當 hazard_pointers 中所有風險指標都有歸屬的執行緒, 那麼就擲出例外情況. hazard_pointer 的靜態成員函式 get_for_this_thread 負責為當前執行緒申請風險指標, 每一個執行緒都有自己的風險指標, 所以我們對靜態局域變數 owner 標識 thread_local. 當執行緒結束, 對應的靜態局域變數被回收, 呼叫 hazard_pointer 的解構子, 將指標和執行緒 ID 清空, 這個風險指標便空了出來.

我們主要討論 Code 29 無鎖堆疊的成員函式 pop. 在本執行緒申請了風險指標之後, 首先存取了頭部結點的值, 接著要做的就是將這個結點標記為風險指標. 由於從存取到標記這中間存在間隙, 所以要透過迴圈不斷重試, 直到使用結點的 next 更新 this->head 完成為止. 這裡我們使用了兩個 do-while 迴圈. 內層 do-while 迴圈用於設定風險指標, 外層 do-while 迴圈用於更新 this->head. 外層 do-while 迴圈使用了 compare_exchange_strong, 這是因為如果在虛假失敗的情況下, 風險指標會被重複設定為同一個結點, 這是沒有必要的. 相比於重複設定, 使用 compare_exchange_strong 在效能上具有更好的表現. 這裡的雙層迴圈不能用外層迴圈直接替代, 因為設定風險指標和重設 this->head 是兩個動作. 如果將兩個迴圈合併成一個就無法處理這種情況 : 在 old_head 初始化後, 在風險指標設定之前, 其它執行緒發現 old_head 沒有在風險指標列表中, 已經回收了它. 這種情況下, 單層迴圈會直接將一個已經回收的指標放入風險指標列表中, 這必然會在之後造成未定行為. 這裡的內層循環實際上就是在風險指標的設定上模仿了比較-交換操作, 若且唯若 old_headthis->head 相等的時候才設定風險指標. 當然, 在設定之後 this->head 還可能改變, 那麼就整體重新來一遍, 這個由外層迴圈保證.

接下來, 若 old_head 不是空指標, 那麼我們就尋訪所有風險指標列表, 看看 old_head 是否還被其它執行緒持有. 如果沒有的話, 就將 old_head 回收; 否則, 配置一個風險指標的結點, 放入風險指標列表中稍後回收. 最後, 我們獨佔風險指標列表, 逐個檢查不再有風險的指標, 回收對應的結點.

注意成員函式 pop 沒有被 noexcept 所標識, 但是解構子仍然被 noexcept 所標識. 這是因為建構子和解構子一般不允許併發, 所以我們可以肯定只有一個執行緒呼叫成員函式 pop. 如果到了解構階段, 一般呼叫解構子的都是主執行緒, 子執行緒的任務已經全部完成並且終結. 這個時候不存在競爭問題, 自然不需要風險指標列表, 當然也就不需要再配置任何風險指標. 此處如果擲出例外情況, 必定屬於異常情況, 直接讓程式終結暴露問題反而有助於幫助我們提前知曉問題出處.

若兩個執行緒同時取到了同一個頭部結點, 是否可能存在兩個執行緒共同將同一個結點分別放入風險指標列表的情況, 也就是風險指標列表是否重複存在同一個結點? 關鍵還是在雙層 do-while 迴圈中, 兩個執行緒一定有一個能獨佔結點. 若執行緒 T1 獨佔了結點, 執行緒 T2 將該結點標記為風險指標, 然後執行緒 T2 暫停. 此時執行緒 T1 在尋訪風險指標列表之後, 發現這個結點是風險指標, 於是將其放入風險指標列表稍後回收. 接下來執行緒 T2 的外層 do-while 迴圈中的比較-交換操作一定會失敗, 根本就等不到將結點放入風險指標列表這種操作. 所以這種情況下, 結點不可能被多次放入風險指標列表.

Code 29 中的實作, 在每一次 pop 都要尋訪整個風險指標列表, 效能稍低. 我們可以增加一個針對風險結點列表的計數 deprecated_node_count, 在計數達到閾值的時候, 才進行一次風險指標列表中的結點回收. 在 Code 29 中, 若 deprecated_node_count 小於閾值的時候, 可能每一次尋訪都無法回收任何一個結點. 但這種方案有一個缺點, 就是風險指標列表會變得更長. 當然, 也可以讓每個執行緒都維護一個自己的風險指標列表, 這樣不必進行計數, 也不存在全域的風險指標列表的競爭, 但是這樣會使用更多空間. 不過, 若一個執行緒維護的風險指標列表還有結點, 而執行緒準備終結, 可以將其轉移到全域風險指標列表或者加入到其它執行緒的風險指標列表中. 這樣雖然仍然有競爭, 但是相比所有執行緒都使用一個, 這種方式的競爭會少一些.

即然結點被回收後, 又需要重新配置, 這種所消耗的時間也可以被節省下來. 我們可以參考記憶池 (參考《記憶池》) 的做法, 找個地方把無風險的待回收結點儲存下來, 每次要配置的時候先去檢查是否有可重複利用的結點. 結點池的做法也非常簡單, 同樣使用前向連結串列即可, 它和風險指標列表幾乎沒什麼不同. 然而, 重複利用結點可能導致 ABA 問題. ABA 問題指的是執行緒 T1 取得了某個原子變數的值 A, 其它執行緒在 T1 進行比較-交換之前改變原子變數的值為 B, 同時又有執行緒 (可能和上面的是同一個) 在 T1 進行比較-交換之前再次將原子變數的值改回了 A. 此時, 執行緒 T1 的比較-交換操作會因為值和之前相等而成功, 不能察覺到原子變數的值已經變過, 甚至變了多次 (ABA 問題還可以無限拓展, 只要首尾值相等即可). 這可能導致未定行為甚至錯誤. 從本質上來說, ABA 問題時因為比較-交換操作只判斷了原子變數的值是否發生改變, 而忽略了原子變數值的歷史. 例如, 某個結點被執行緒 T1 存取到還未標記為風險指標, 另一個執行緒直接回收了這個結點. 在執行緒 T1 標記風險指標之前, 又有一個執行緒配置結點的時候復用了這個結點, 又把它放回了堆疊中成為 this->head. 執行緒 T1 這個時候發現儲存下來的 old_head 沒有變化, 便將 old_head 標記為了風險指標. 按道理執行緒應該移除的是原來排在 old_head 之後的結點 (當然在無鎖堆疊下, 這可能不會造成什麼問題). 再比如, 某片記憶體原來儲存著實時匯率. 執行緒 T1 在那一刻取到了最新的匯率, 但是還未開始計算. 執行緒 T2 回收了這篇記憶體, 又重新進行了配置, 記憶體位址保持不變, 但是實時匯率被執行緒 T2 更新了. 此時, 執行緒 T1 進行比較-交換操作後發現記憶體位址沒有被更新過, 自然認為匯率還是最新的, 從而計算出錯誤的金額導致客戶或者機構產生損失. 又比如, 這篇記憶體儲存的是一個動態配置的記憶體位址 (型別為 T **), 當其它執行緒回收了對應的 T *, ABA 問題在此處就可能再次回收那一片已經回收過的 T *, 造成未定行為.

為了解決 ABA 問題, 若要在基於風險指標的堆疊中引入結點復用, 可以為結點增加一個 ABA 標記. 每次配置的結點都更新這個標記, 保證每次配置後, 這個標記是獨一無二的. 接下來, 比較-交換操作比較到這個標記不一樣, 就會知道這個結點已經被回收過一次, 現在的和原來儲存下來的結點本質上不是同一個結點, 最終比較-交換操作失敗.

除了效能問題和 ABA 問題, 基於風險指標的無鎖堆疊在應用的時候還可能面臨法律風險, 因為 IBM 公司為這項技術申請了專利. 雖然專利可能已經過期, 但是在不同國家或者地區, 針對專利的法律規定可能不同. 若要在實際項目中使用基於風險指標的記憶體回收方案 (可不止基於風險指標的無鎖堆疊), 都需要進行法律風險的評估.

3.1.3 基於結點計數的無鎖堆疊

第 3.1.3 小節, 我們對成員函式 pop 的呼叫次數進行了計數. 自然地, 我們想到可以學習 std::shared_ptr 為每一個結點進行計數以解決無鎖堆疊下的記憶體問題. 我們首先改造結點 :

#include <memory>

template <typename T>
struct node {
    node *next;
    std::size_t count;
    std::shared_ptr<T> value;
};

此時, 無鎖堆疊成員函式 pop 的實作思路為

  1. 存取 this->head, 得到 old_head;
  2. 增加 old_head->count;
  3. 透過比較-交換操作將 old_head->next 指派給 this->head;
  4. 檢查 old_head->count 的值 : 若為一, 則進行回收; 否則, 讓 old_head->count 減一即可退出.

首先, 多個執行緒可能共同修改成員 count, 所以其必須是一個原子變數, 這個問題並不算大. 其次, 成員 countpop 操作是強綁定的, 若無鎖堆疊有能力向外提供結點, 則計數不僅與 pop 操作有關, 還和結點的複製有關, 這時的 count 就有點類似於 std::shared_ptr 的強計數 (參考《【C++】智慧指標》); 最後, 步驟 1步驟 2 之間存在間隙, 可能在執行緒剛取得 old_head 後, 另一個執行緒已經將 old_head 回收了. 此時, 再存取 old_head->next 進行比較-交換操作或為其增加計數都會導致未定行為. 前面兩個問題都有解, 最後一個問題看似無解.

要避免未定行為的核心是在獲得結點前就必須告訴其它執行緒還存在執行緒正在使用結點. 即然無法直接操作 node 中的資料, 那麼我們就將計數從結點中獨立出來 :

#include <memory>

template <typename T> struct node;
template <typename T>
struct counted_node {
    node<T> *data;
    std::size_t count;
};
template <typename T>
struct node {
    counted_node<T> next;
    std::shared_ptr<T> value;
};

我們將無鎖堆疊中的 this->head 型別更改為 std::atomic<counted_node<T>>, 在每次取出結點前, 我們先將 this->head 對應的計數增加一 :

#include <memory>
#include <atomic>

template <typename T>
std::shared_ptr<T> lock_free_stack<T>::pop() noexcept {
    auto old_head {this->head.load()}, counted_old_head {old_head};
    do {
        ++counted_old_head.count;
    }while(not this->head.compare_exchange_weak(old_head, counted_old_head));
    // ...
}

透過 this->head 作為中介, 結合比較-交換操作, 本執行緒要想取得 this->head, 必須先增加計數讓其它執行緒知曉. 若兩個執行緒同時取得了同一個結點, 第一個執行緒先增加了計數, 另一個執行緒的比較-交換操作便會失敗, 再次取出的時候只有兩種情況 :

  1. this->head 變成了一個新的結點;
  2. this->head 的計數被增加一.

在第二種情況下, 執行緒 T1 會知道還存在其它執行緒正在使用同一個結點. 若執行緒 T2 的速度較快, 在還存在執行緒 T1 使用結點的情況下, T2 是不能直接回收結點的, 只能讓計數減一之後退出. 現在讓我們回到第一個執行緒 T1. 假設兩個執行緒都增加了計數, 執行緒 T1 還要再更新一次 old_head 以獲取最新的計數, 避免其它執行緒更快地更新了 old_head :

#include <memory>
#include <atomic>

template <typename T>
std::shared_ptr<T> lock_free_stack<T>::pop() noexcept {
    auto old_head {this->head.load()};
    do {
        auto counted_old_head {old_head};
        do {
            ++counted_old_head.count;
        }while(not this->head.compare_exchange_weak(old_head, counted_old_head));
        ++old_head.count;
    }while(not this->head.compare_exchange_strong(old_head, old_head->next));
    // ...
}

Code 31-2 很好地保證了若執行緒想要使用 this->head 中的資料, 同一時間持有相同結點的執行緒一定能獲知. 然而, 我們又遇到了新的問題 :

  1. 若執行緒準備退出 pop, 發現計數不為零, 無法通知其它執行緒. 因為 count 被獨立出去了, 並不存在於結點的資料之中;
  2. 若無鎖堆疊有能力向外部提供結點, 對於外部結點的更新, 其它執行緒也無法獲知.

上面兩個問題的本質是沒有一個執行緒之間可共享的一片記憶體去儲存計數. 當計數從結點資料中被轉移到外部之後, 若結點不存在於堆疊之中, 就沒有任何中介來傳遞更新後的計數. 顯然, 僅靠 counted_node 上的外部計數無法滿足結點脫離 this->head 後再去更新計數的需求. 外部計數僅能用來保證結點不會因為還存在執行緒想要更新結點內部資料, 結點便被另一執行緒回收. 為此, 我們還要在 node 上再增加一個內部計數, 以支撐結點脫離 this->head 後, 還能正確更新計數. 在實作正確的結點結構之前, 我們還需要注意一種情形 : 假設兩個執行緒同時持有一個結點, 其中一個執行緒先檢查結點是否滿足回收條件時, 從外部計數發現還存在另一個執行緒正在使用結點. 這時為了通知另一個執行緒, 其減少的計數必須操作於結點的內部計數上. 此時, 內部計數可能小於零, 故內部計數不能使用無號數型別 :

#include <memory>
#include <atomic>

template <typename T> struct node;
template <typename T>
struct counted_node {
    node<T> *data;
    long external_count;
};
template <typename T>
struct node {
    counted_node<T> next;
    long internal_count;
    std::shared_ptr<T> value;
};

這種計數方式被稱為雙計數. 多重計數是 C++ 無所程式設計中常用的方法. 有了正確的計數後, 我們可以開始實作無鎖堆疊 :

#include <memory>
#include <atomic>

template <typename T> struct node;
template <typename T>
struct counted_node {
    node<T> *data;
    long external_count;
};
template <typename T>
struct node {
    counted_node<T> next;
    long internal_count;
    std::shared_ptr<T> value;
};
template <typename T>
class lock_free_stack {
private:
    std::atomic<counted_node<T>> head {};
public:
    lock_free_stack() noexcept = default;
    lock_free_stack(const lock_free_stack &) = delete;
    lock_free_stack(lock_free_stack &&) = delete;
    ~lock_free_stack() noexcept {
        while(this->pop());
    }
public:
    lock_free_stack &operator=(const lock_free_stack &) = delete;
    lock_free_stack &operator=(lock_free_stack &&) = delete;
public:
    void push(const T &value) {
        const auto new_node {new node<T> {this->head.load(), 0, std::make_shared<T>(value)}};
        while(not this->head.compare_exchange_weak(new_node->next, {new_node, 1}));
    }
    std::shared_ptr<T> pop() noexcept {
        auto old_head {this->head.load()};
        while(true) {
            for(auto counted_old_head {old_head};; counted_old_head = old_head) {
                ++counted_old_head.external_count;
                if(this->head.compare_exchange_strong(old_head, counted_old_head)) {
                    old_head.external_count = counted_old_head.external_count;
                    break;
                }
            }
            const auto data {old_head.data};
            if(not data) {
                break;
            }
            if(this->head.compare_exchange_strong(old_head, data->next)) {
                std::shared_ptr<T> result {};
                result.swap(data->value);
                const auto count_transfer {old_head.external_count - 2};
                if(data->internal_count.fetch_add(count_transfer) + count_transfer == 0) {
                    delete data;
                }
                return result;
            }
            if(data->internal_count.fetch_sub(1) == 1) {
                delete data;
            }
        }
        return {};
    }
};

Code 32-1 中, 成員函式 pushpop 都發生了變化, 我們從 push 開始討論. 成員函式 push 比較簡單, 其做的工作仍然是配置新的結點並且將新結點放入前向連結串列頭部. 由於現在結點接受任何型別, 我們還要考慮例外安全問題. 在 new_node 建構的過程中, new_node->value 使用了 std::make_shared, 而並非直接使用 new (參考《【C++】智慧指標》第 3 節). 另一方面, 新結點的內部計數一開始為零, 外部計數被初始化為一. 這是因為對於任意一個結點, 要麼它是頭部結點, 要麼它不是頭部結點. 如果一個結點是頭部結點, 那麼 this->head 指向它, 其外部計數體現在這裡; 如果一個結點不是頭部結點, 那麼必然存在另一個結點的 next 指向它.

成員函式 pop 則複雜很多. 首先, 執行緒必須將獲得結點的外部計數增加一以通知其它執行緒. 不同於 Code 31-2, 我們使用了 counted_old_head 的外部計數來更新 old_head 的外部計數, 兩者本質上沒什麼差別. 當比較-交換成功後, 必然有

old_head.external_count + 1 = counted_old_head.external_count

成立. 若上式不成立, 說明中間又存在其它執行緒持有了相同結點, 且先於本執行緒完成計數更新. 此時, 比較-交換操作不會成功, old_head 會被更新. Code 32-1 中所使用的寫法只是語義上更加明確 : old_head 的外部計數必須和 counted_old_head 的外部計數保持一致. 在比較-交換時, 我們使用了 compare_exchange_strong, 此處應該避免由於虛假失敗所造成的多餘迴圈. 接下來我們儲存了 old_head.data, 這是因為下面的比較-交換操作可能改變 old_head, 我們不能對其它結點中的 data 進行操作, 因為在下面的過程中我們不再更新外部計數. 接著再次透過比較-交換操作, 我們使用 data->next 更新 this->head. 若比較-交換操作成功, 則後續不可能再有執行緒可以從堆疊上獲取到該結點, old_headpop 操作由本執行緒完成 (注意結點回收操作不一定由本執行緒完成). 相當於執行緒會在此處將 old_head 獨佔. 我們使用交換操作, 交換空指標與 old_head.data->value 來避免複製增加一次計數. 最後, 我們進行了計數轉移. 當結點透過比較-交換操作被從堆疊上移除之後, 執行緒改變結點外部計數這一動作將變得 “私密”, 沒有任何渠道可以發布這一改動讓其它執行緒知曉. 能讓其它執行緒感知到本執行緒不再持有結點的動作僅能是透過內部計數這唯一一個信息中介, 故我們要將外部計數的值轉移到內部計數上. 在計數轉移之前, 若其它執行緒的比較-交換操作失敗, 則會令內部計數減去一. 當內部計數回傳值小於零的時候, 便表明計數的轉移還未發生, 一定還存在執行緒持有相同的結點 (至少還有負責轉移計數的那個執行緒), 且計數轉移操作即將發生; 若內部計數回傳的值大於一, 那麼說明計數的轉移已經完成, 但還存在執行緒持有結點, 此時也不能進行結點回收; 若內部計數的值為一 (即 fetch_sub 回傳的舊值), 則說明當前有且唯有一個執行緒持有結點, 即本執行緒, 可以安全地回收, 這也是最後一個 if 判斷所做的事情. 現在讓我們回到計數轉移這件事上來. 在執行緒從堆疊上移除結點之後, 結點便被本執行緒所獨佔, 此時外部計數應該減去一; 當執行緒不再持有結點後, 結點的外部計數應該再減去一. 故最終要轉移的計數為 old_head.external_count - 2. 其它執行緒雖然無法直接修改轉移的計數值, 但是它們的退出會令內部計數減去一, 這與轉移之後的計數相抵銷. 最終, 內部計數的值便是剩餘持有結點的執行緒數量. 但是本執行緒的 pop 呼叫還有一些未運作的程式碼, 提前將計數轉移安全嗎? 仔細觀察接下里的程式碼可以發現, 我們不再存取結點內部的任何資料. 若此時還存在其它執行緒, 我們只要保證不提前回收結點即可. 因此對於其它執行緒來說, 在這一刻, 本執行緒相當於已經退出 pop, 不再持有結點, 轉移計數的動作是安全的. 若此刻不存在其它執行緒持有相同的結點, 則必然滿足一個條件 : 結點的內部計數和外部計數之和為零, 即

old_head.external_count - 2 + old_head.data->internal_count = 0.

若上述條件滿足, 則本執行緒除了負責從堆疊上移除頭部結點並回傳頭部結點中的值之外, 也負責回收這一結點.

使用雙重計數的無鎖堆疊天然免疫 ABA 問題. 因為當 B 操作和第二個 A 操作發生時, 外部計數必然增加, 比較-交換操作會在此時失敗. 不僅僅是無鎖堆疊, 絕大多數基於計數的記憶體回收方案, 都對 ABA 問題自帶防禦.

到目前為止, 我們還沒有討論無鎖堆疊的記憶體排序問題, 之前所使用的也都是預設且最嚴格的全域一致性次序模型. 為了確保程式效能 (即本資料結構通常作為程式庫的一部分, 向程式設計師提供, 因此本身必須極其注重每一處的能效問題), 我們使用較為寬鬆的獲取-發布次序模型.

成員函式 push 的記憶體排序分析比較簡單. 在配置 new_node 的時候, 存取了一次 this->head 的值. 在 new_node 建構完成到其稱為新的頭部結點, 這中間存在一段空隙, 即使建構 new_node 的時候存取到了 this->head 的最新值, 在空隙中也可能因為其它執行緒更新 this->head 導致 new_node.data->next 不再是最新的. 故在 new_node 建構時, 以 std::memory_order_relaxed 次序存取 this->head 的值即可. 在比較-交換中, 比較-交換失敗再次存取 this->head 的情況和 new_node 建構的情況類似, 故比較-交換操作失敗的場景也可以以 std::memory_order_relaxed 存取 this->head 的最新值. 對於比較-交換成功的場景, 其必須保證取得 this->head 的最新值用於比較, 寫入的值能夠同步給其它執行緒. 因此, 此處理應使用 std::memory_order_acq_rel. 但是因為比較-交換操作本身能夠對原子變數的值進行一次強制同步, 能夠保證取得 this->head 的最新值, 成員函式 push 接下來又沒必要知道其它攜帶的信息, 所以使用 std::memory_order_release 即可.

在成員函式 pop 中, old_head 的獲取與成員函式 push 中的 new_node 建構類似, 以 std::memory_order_relaxed 進行即可. 在外部計數更新的比較-交換操作中失敗的場景也是類似的, 以 std::memory_order_relaxed 進行即可. 在外部計數更新的比較-交換操作成功的場景下, 為了保證接下來的計數正確性, 此處應該使用 std::memory_order_acq_rel 將所有該同步的值都同步過來. 但是由於在成員函式 pushpop 中, 對 this->head 的操作僅限於存取和比較-交換, 而比較-交換操作本身強制同步, 存取操作都以 std::memory_order_relaxed 進行, 所以此處使用 std::memory_order_acquire 即可. 當然, 這裡甚至可以使用 std::memory_order_relaxed, 這種情況我們稍後再討論. 接下來對於獨佔結點的比較-交換操作中, 失敗情況下的存取仍然可以以 std::memory_order_relaxed 進行. 在成功獨佔結點的情況下, 由於要更新 this->head, 它類似於成員函式 push 中更新頭部結點, 因此至少以 std::memory_order_release 進行. 由於在更新外部計數的時候, 已經對 old_head.data 中資料的操作進行了同步, 所以此處可以不需要使用 std::memory_order_acq_rel. 最終, 對於比較-交換成功的場景以 std::memory_order_release 進行就可以了. 剛才我們提到, 更新外部計數的比較-交換操作成功時也可以使用 std::memory_order_relaxed, 這就要求獨佔結點中比較-交換操作成功的場景使用 std::memory_order_acq_rel. 在計數轉移的過程中, 我們希望這個操作可以同步給其它執行緒, 且 fetch_add 本身是基於比較-交換操作強制保障了操作內部可以取到結點內部計數的最新值, 故使用 std::memory_order_release 即可. 除此之外, 此處的 std::memory_order_release 還可以將 data->value 的更新發布給其它執行緒. 最後是獨佔不成功內部計數減少的場景. 從獲取-發布次序模型來說, 其必須使用 std::memory_order_acquire 與計數轉移中的 std::memory_order_release 形成同步, 這幾乎可以說是毫無懸念的. 另一方面, 此處要同步的有 :

  1. data->internal_count 的值, 但它的同步由比較-交換操作本身可提供保障;
  2. data->value 可能已經被其它執行緒 swap 了, 若 delete data 存取到未同步的 data->value, 可能使得 data->valuestd::shared_ptr 強計數不正確, 從而導致未定行為.

然而對於一個結點來說, 任何時刻都只可能存在一個執行緒對其進行回收操作, 故若且唯若條件判斷式被滿足, 才有同步的需求. 因此, 我們可以將內部計數減一這一動作的記憶體排序改為 std::memory_order_release, 在 if 內部增加一個看似無用實則不可缺少的 data->internal_count.load(std::memory_order_acquire). 最終, 我們有

#include <memory>
#include <atomic>

template <typename T> struct node;
template <typename T>
struct counted_node {
    node<T> *data;
    long external_count;
};
template <typename T>
struct node {
    counted_node<T> next;
    long internal_count;
    std::shared_ptr<T> value;
};
template <typename T>
class lock_free_stack {
private:
    std::atomic<counted_node<T>> head {};
public:
    lock_free_stack() noexcept = default;
    lock_free_stack(const lock_free_stack &) = delete;
    lock_free_stack(lock_free_stack &&) = delete;
    ~lock_free_stack() noexcept {
        while(this->pop());
    }
public:
    lock_free_stack &operator=(const lock_free_stack &) = delete;
    lock_free_stack &operator=(lock_free_stack &&) = delete;
public:
    void push(const T &value) {
        const auto new_node {new node<T> {this->head.load(std::memory_order_relaxed),
                0, std::make_shared<T>(value)}};
        while(not this->head.compare_exchange_weak(new_node->next, {new_node, 1},
                std::memory_order_release, std::memory_order_relaxed));
    }
    std::shared_ptr<T> pop() noexcept {
        auto old_head {this->head.load(std::memory_order_relaxed)};
        while(true) {
            for(auto counted_old_head {old_head};; counted_old_head = old_head) {
                ++counted_old_head.external_count;
                if(this->head.compare_exchange_strong(old_head, counted_old_head,
                        std::memory_order_release, std::memory_order_relaxed)) {
                        // 或者 std::memory_order_relaxed, std::memory_order_relaxed)) {
                    old_head.external_count = counted_old_head.external_count;
                    break;
                }
            }
            const auto data {old_head.data};
            if(not data) {
                break;
            }
            if(this->head.compare_exchange_strong(old_head, data->next,
                    std::memory_order_release, std::memory_order_relaxed)) {
                    // 或者 std::memory_order_acq_rel, std::memory_order_relaxed)) {
                std::shared_ptr<T> result {};
                result.swap(data->value);
                const auto count_transfer {old_head.external_count - 2};
                if(data->internal_count.fetch_add(count_transfer, std::memory_order_release) + count_transfer == 0) {
                    delete data;
                }
                return result;
            }
            if(data->internal_count.fetch_sub(1, std::memory_order_relaxed) == 1) {
                data->internal_count.load(std::memory_order_acquire);
                delete data;
            }
        }
        return {};
    }
};

可以看到, 編寫 Code 32-2 相當不容易, 也相當考驗人. 我們反覆提到無鎖程式設計之難在 Code 32-2 中被體現得淋漓盡致. C++ 的無鎖程式設計之所以困難, 核心原因就在於記憶體的管理.

3.1.4 基於紀元的無鎖堆疊

一般來說, 一個程式從被運作到終結這段生命週期可以被分成若干個紀元, 每個紀元中的變數相互獨立, 毫無關係. 這有點像和過去告別. 例如搬家的時候, 如果新家什麼都有了, 而且比老房子裡的家具更先進, 那麼我們就可以毫無顧忌地賣掉老房子中的二手家具. 若程式有一個全域紀元, 每個執行緒有自己的紀元標記. 當所有歸屬於某個紀元的執行緒都已經終結, 那麼我們就可以安全地回收這個紀元下配置出來的所有記憶體. 核心流程如下 :

  1. 程式有一個只能遞增的全域紀元 global_epoch;
  2. 每個執行緒都有自己的紀元標記 local_epoch 和臨界標記 (用於標記是否在該紀元內使用或者配置了動態記憶體);
  3. 當一個執行緒進入臨界區後, 令 local_epoch = global_epoch;
  4. 當一個執行緒退出臨界區後, 清除臨界標記, 將要回收的記憶體統一放入一個退休列表中, 並且標記這些記憶體的 retire_epochgloabl_epoch;
  5. 週期性地檢查所有執行緒都在某個 global_epoch 中. 若是, 則推進 global_epoch; 否則, 還不能推進. 在 global_epoch 被推進的時候, 便意味著所有小於 global_epoch 的紀元下, 其所屬的執行緒已經全部退出. 這個時候, 我們可以安全地回收這些紀元下配置出來的記憶體. 換句話說, 若紀元被推進到 e, 則第 e_{i} (e_{i} \in \left \{ 1, 2, ..., e - 2 \right \}) 紀元下的配置出來的記憶體都可以被安全回收.

有時候為了保守起見, 我們可能將紀元回收的範圍縮小到 \left \{ 1, 2, ..., e - 3 \right \} 甚至更小.

在無鎖堆疊下, 我們並不需要將紀元寫入結點資料中, 因為紀元在配置之後直到結點被回收, 都不會改變. 因此我們可以將結點設計為類似於外部計數的形式 :

#include <atomic>

template <typename T>
struct node {
    std::atomic<node *> next;
    T value;
};
template <typename T>
struct epoch_based_head {
    node<T> *data;
    unsigned long epoch;
};

可以想像, 每次 push 配置的新頭部結點 epoch 都不相同, 所以上述結點結構可以遠離 ABA 問題的困擾.

3.2 無鎖佇列

無鎖堆疊在實際中應用得並不多, 其作用基本是作為 C++ 無鎖程式設計的入門教學資料結構. 這是因為堆疊就好像一個僅能容納一輛車的車道, 一旦兩輛車迎面駛來 (一個執行緒 push, 另一個執行緒 pop), 必然有一輛車要退回車道之外等待另一輛車通過. 當車輛較多的時候, 可能出現經過長時間排隊等待都無法進入車道的情況. 而若再增加一個車道, 來回兩個不同方向的車輛各自走自己的車道, 則情況會緩解很多. 佇列便是這樣一個 “雙車道” 的資料結構. 對於事件消費型應用程式來說, 若對資料的順序沒有要求, 即本都會以佇列作為事件消費的底層資料結構.

為了增加一個 "車道", 佇列除了頭部結點之外, 還要再增加一個尾部結點. 我們假設佇列一開始有一個空值結點, 頭部結點和尾部結點都指向它. 當頭部結點和尾部結點指向同一個結點的時候, 我們判定佇列為空. 當我們想要向佇列中插入一個新值的時候, 首先填充尾部結點中的那個空值, 再配置一個新的空值結點, 讓尾部結點的 next 指向它, 最後把尾部結點更新為剛剛配置的那個新的空值結點即可. 按照我們的設計, 佇列的基本結構為

#include <memory>
#include <atomic>

template <typename T> struct node;
template <typename T>
struct counted_node {
    node<T> *data;
    long external_count;
};
template <typename T>
struct node {
    counted_node<T> next;
    long internal_count;
    std::shared_ptr<T> value;
};

template <typename T>
class lock_free_queue {
private:
    std::atomic<counted_node<T>> head;
    std::atomic<counted_node<T>> tail;
public:
    lock_free_queue() : head {new node<T> {}, 2}, tail {this->head} {}
    lock_free_queue(const lock_free_queue &) = delete;
    lock_free_queue(lock_free_queue &&) = delete;
    ~lock_free_queue() noexcept {
        while(this->pop());
    }
public:
    lock_free_queue &operator=(const lock_free_queue &) = delete;
    lock_free_queue &operator=(lock_free_queue &&) = delete;
public:
    void push(const T &value);
    std::shared_ptr<T> pop() noexcept;
};

由於頭部結點和尾部結點一開始同時指向建構子中配置的空值結點, 因此其外部計數一開始為 2. 根據無鎖堆疊中的設計以及佇列的特點, 成員函式 push 基本可以由以下步驟組成 :

  1. 為新插入的值配置智慧指標;
  2. 配置新的空值結點, 準備作為新的 this->tail;
  3. 取得 this->tail 並且增加其外部計數, 作為 old_tail;
  4. old_tail 中的值對應的指標為空, 則將步驟 1 中的智慧指派指派給它; 若指標不為空, 則等待為空的新尾部結點;
  5. old_head 中的 next 為空, 則將新配置的結點指派給它, 更新 this->tail 指向新的空值結點 (按照目前的設計, 其它運作 push 的執行緒會等待新結點, 故不存在 old_tail 中的 next 為空的場景);
  6. 釋放 old_head 一個外部計數, 相加一次內部計數判斷結點是否已經被移除, 是否可以回收 (注意此處外部計數的釋放僅包含 push 本身增加的外部計數, 而不包含 this->head 的外部計數. 因為前一個結點可能指向它, this->head 也可能指向它).

步驟 4 中, 顯然 node<T>::value 可能被多個執行緒共同更改, 所以應該將 node<T>::value 改為原子變數. 然而, std::atomic<std::shared_ptr<T>> 或者 std::atomic<std::unique_ptr<T>> 是 C++ 20 之後才引入的, 我們需將 node<T>::value 的型別從 std::shared_ptr<T> 退化為 std::atomic<T *>. 步驟 5 中的問題和步驟 4 中類似, 我們要將 node<T>::next 的型別從 counted_node<T> 改為 std::atomic<counted_node<T>>. 然而, 步驟 4 中還存在另一個問題. 若一個執行緒一直無法等到一個空值結點, 那麼這個執行緒就會反覆進行比較-交換操作, 並且一直失敗, 形成了實質上的忙等. 因此, 若一個執行緒發現 old_tail 中的值的指標非空, 可以無需等待別的執行緒新增 next, 該執行緒可以直接幫其它執行緒配置新的空值結點作為新的 this->tail. 反正最終結點都是空值的, 誰來配置都一樣. 然而, 這個會導致計數不清 : 一開始佇列為空, 佇列中有且唯有一個空值結點 N_{1} :

Figure 2-1. 佇列的初始狀態

執行緒 T1 準備 push, 增加了 this->tail 的外部計數, T1 為 N_{1} 填充了值之後並配置了一個新的空值結點作為 N_{1}next, 但是還未改動 this->tail 的指向 :

Figure 2-2. 執行緒 T1 進行 push

接著執行緒 T2 準備 push, 增加了 this->tail 的外部計數, 但還未進行其它操作 :

Figure 2-3. 執行緒 T2 增加外部計數

此時, 我們其實已經看到 this->headthis->tail 的計數嚴重不同步, 但目前還不至於造成什麼問題. 接著執行緒 T1 更新 this->tailN_{2} 準備退出, 按道理 T1 應該減去的是 T1 本身因為 push 所增加的外部計數和 this->tail 所持有的外部計數. 然而由於佇列目前非空, 且有 N_{1} \to N_{2}, 因此 T1 隻應該為 this->tail 減少一個計數. 因為在 this->tail 被更新為 N_{2} 之後, N_{1}next 指向它, this->tail 也指向它. 但是顯然這也不正確, 因為 this->tail 減去一個計數的時候, 計數還是三, 新的 this->tail 在 T1 退出之後並沒有任何執行緒持有它, 正確的計數應該是直接更新為 2. 另一個計數應該減在 N_{1} 上 :

Figure 2-4. 執行緒 T1 退出

我們沒有讓 N_{1} 的計數直接減二, 而是丟棄了一個計數. 但是 Figure 2-4 已經展現了一些不合理 : 對於 this->head 來說, 由於沒有人持有它, 所以內外計數只和為一是正確的; 對於執行緒 T2 所持有的 N_{1}, 它多了一個外計數. 但是如果讓內部計數減二, this->head 的計數便不正確, 根據我們的假設, 此時的 this->head 會因為內外計數之和為零被回收. 不止這一個問題. 此時若另一個執行緒開始運作 pop, 要移除 this->head, 即 N_{1}, 此時 this->head 的外部計數為 3, 內部計數為 -1, 當計數轉移並減去二之後, N_{1} 被回收. 最終, 執行緒 T2 嘗試在內部計數上進行減一操作的時候, 便發生了未定行為. 這一系列問題的核心來源是針對同一個結點, 可能會存在多個獨立的外部計數, 它們互不干擾, 本應該各自運作, 然而上面的討論將它們混在了一起. 不論一個結點有多少個獨立的外部計數, 其是否可以回收的原則只有一個 : 所有外部計數完成轉移, 並且內部計數減為零. 因此, 我們有必要統計有多少個外部計數. 由於外部計數的數量要在執行緒之間共享, 所以只能將其放入結點的資料中. 幸運的是, 基於上述設計的無鎖佇列僅存在以下幾種獨立的外部計數 :

  • this->head 中的外部計數;
  • this->tail 中的外部計數;
  • previous_node->next 中的外部計數.

執行緒運作成員函式 pushpop, 離不開上述三者, 不是一個獨立的外部計數. 當佇列大小為空的時候, this->headthis->tail 共同指向一個空值結點, 不存在一個結點的 next 指向這個空值結點. 所以當佇列為空的時候, 最開始的那個空值結點有兩個獨立外部計數. 當佇列非空的時候, 頭部結點僅有 this->head 一個外部計數, this->tail 並不指向它, 也不存在任意結點的 next 指向頭部結點; 由於必定存在一個結點的 next 指向尾部結點, 且 this->tail 也指向尾部結點, 所以尾部結點有兩個獨立的外部計數; 除了頭尾結點之外的其它結點都可以視為某個結點的 next, this->headthis->tail 並不指向它們, 所以它們只有一個外部計數. 綜上所述, 我們設計的佇列結點中, 一個結點執多只有兩個獨立的外部計數, 我們可以從內部計數中分享 2 位元來儲存一個結點的外部計數數量. 這樣保證了計數相關的原子操作在絕大多數裝置上仍然是無鎖的, 且可以節約空間 :

#include <atomic>
struct counters {
    unsigned long internal_count : sizeof(unsigned long) * 8 - 2;
    unsigned long external_counters : 2;
};
template <typename> struct node;
template <typename T>
struct counted_node {
    node *data;
    unsigned long external_count;
};
template <typename T>
struct node {
    std::atomic<counted_node<T>> next;
    std::atomic<counters> internal_count_and_external_counters;
    std::atomic<T *> value;
};

我們之前的討論都將 unsigned long 或者 long 視為可以使用無鎖 CPU 指令實現無鎖化, 但是部分裝置可能因為 unsigned long 或者 long 太大而無法支援. 這類裝置的併發數量通常也不會太大, 所以可以將 unsigned long 或者 long 退化為 unsigned int 或者 int.

有了結點的基本結構後, 成員函式 pop 的邏輯相對簡單 (成員函式 push 基本也不需要大改) :

  1. 取得 this->headold_head 並且增加其外部計數;
  2. this->head.datathis->tail.data 相同, 則佇列為空, 減少外部計數後退出; 否則,
  3. 透過比較-交換操作, 使得 old_head.next 稱為新的頭部結點, 並且增加其外部計數器的數量;
  4. old_head 中的值取出並且配置 std::unique_ptr<T> 作為成員函式 pop 的回傳值, 方便外部管理記憶體;
  5. 轉移 old_head 的外部計數, 減少由於成員函式 pop 增加的外部計數和 this->head 指向所需要的一個外部計數器的數量. 最終由結點自己判斷自己是否可以回收.

步驟 5 中, 我們讓結點成為了一個智慧結點, 類似於智慧指標, 自己判斷自己是否可以回收. 這是因為由於外部計數器不止一個, 沒有一個執行緒可以清除得知結點的全域計數情況, 讓執行緒回收結點已經變得不可行. 結點判斷自己可回收的標準是 : 不存在任何外部計數器且內部計數為零.

在實際運用中, 這種三計數的無鎖佇列並不是主流的. 從結點的構造也可以看出, 基於三計數的無鎖佇列存在太多原子變數, 需要進行大量比較-交換操作. 同時, 由於結點不存在重用機制, 故這種設計也可能導致大量的記憶體配置之後又回收. 總的來說, 基於三計數的無鎖佇列效能並不高.

若佇列大小的上限已知, 則底層資料結構可以使用環狀陣列; 若陣列的大小未知或者明確需要佇列高度可擴展, 那麼可以將多個固定大小的陣列透過前向連結串列的方式串連起來, 又或者由一個可擴展的大佇列管理多個固定大小且經過優化的小佇列. 這幾種方式是許多主流程式庫實作無鎖佇列的方式.

4. 無鎖程式設計下的原則

第 3.1 節可以看到, 無鎖程式設計不能一步登天. 我們在設計無鎖堆疊的時候, 遵循著一些無鎖程式設計原則, 這些原則幫助我們遠離錯誤, 一步一步優化我們的程式碼.

4.1 選擇合適的無鎖記憶體管理方案

第 3.1 節中, 我們介紹了四種無鎖堆疊的設計, 這些設計主要為了管理無鎖堆疊的記憶體. 我們可以將這些設計歸納為三大類 C++ 無鎖程式設計下的記憶體管理方案 :

  • 基於計數的記憶體管理;
  • 基於風險指標的記憶體管理;
  • 基於紀元的記憶體管理.

儘管真正用於生產的無鎖容器可能並不直接使用這三種方案, 但是事實上幾乎所有 C++ 無鎖程式設計的記憶體管理方案都是基於這三種方案的改進版或者結合版. 每一個不同的程式, 每一種不同的架構, 都會有適應於自己的無鎖記憶體管理方案. 因此在設計時, 我們應該為自己的程式選擇合適的記憶體管理方案.

4.2 防範 ABA 問題

我們已經討論過 ABA 問題可能給我們帶來的困擾, ABA 問題在回收復用的場景下出現得尤其多. 在我們選擇無鎖記憶體管理方案的時候, 我們必須同時考慮這個記憶體管理方案會不會存在 ABA 問題. 另外, 我們在進行比較-交換操作的時候, 也必須考慮一下, 此處的比較-交換操作會不會因為 ABA 問題產生問題.

4.3 最後考慮記憶體排序

對於任何無鎖程式碼來說, 在設計一開始不應該考慮記憶體排序, 先使用最嚴格的全域一致性次序模型. 在程式碼編寫的時候, 記憶體排序是難以分析的, 因為它隨時可能因為一行程式碼的變化而發生變化. 如果在一開始便考慮記憶體排序, 不但會增加設計難度, 還可能因為後面遺漏一些記憶體排序的改動, 導致同步出現錯誤.

在無鎖程式碼編寫和測試完成之前, 我們應該保持使用預設的 std::memory_order_seq_cst. 在程式碼經過嚴格的測試驗證正確之後, 我們才開始著手考慮記憶體排序, 使用更寬鬆的記憶體排序. 甚至很多時候, 我們根本不需要考慮記憶體排序. 絕大多數程式的優化點通常在於程式碼的寫法, 演算法的使用以及整體架構的設計, 程式的效能熱點通常不會由記憶體排序造成. 如果閣下編寫的程式效能熱點真的在記憶體排序, 那麼閣下自然有足夠的能力優化它.

4.4 找出忙等的場景

無鎖並不意味著遠離等待, 就像在第 3.2 節中看到的那樣, 比較-交換操作也會因為競爭而反覆進行, 這就是實際意義上的等待. 假設有一個原子變數, 執行緒 T1 和 T2 首先修改這個值, 然後進行計算, 最後寫入新值. 若執行緒 T1 在修改之後, 執行緒 T2 再去修改, 反覆交替, 就會形成忙等. 執行緒 T1 和 T2 都無法將自己計算的值寫入進去, 因為大家的比較-交換操作總會因為對方更改原子變數的值而失敗. 最終沒有一個執行緒寫入成功, 一直卡在比較-交換迴圈中. 在高度衝突下, 反覆重試可能造成 cache line 反覆存取然後又被移除, 形成 cache line ping-pong.

在這種情況下, 我們可以參考基於三重計數的無鎖佇列的解決方案, 當執行緒發現自己的比較-交換操作失敗, 看看自己是否可以幫助其它執行緒完成一些任務, 以推進其它執行緒的運作. 我們應該在設計時就考慮避免因為比較-交換操作的反覆重試, 讓執行緒卡在這裡.

4.5 不要總是堅持無鎖

無鎖程式設計具有諸多優點, 但是並不是基於無鎖的程式碼就一定比基於鎖的程式碼慢 :

  • 在低併發或者互斥鎖的競爭機率比較低的情況下, 直接使用互斥鎖的開銷更低. 因為原子變數的原子操作, 記憶體屏障, ABA 防範以及特殊的記憶體回收方案都會帶來額外的開銷;
  • 在需要高度可擴展的場景下, 無鎖資料結構更具優勢;
  • 在交易等需要極低延遲的場景下, 無鎖資料結構吞吐量遠高於基於鎖的資料結構, 這種場景下通常有著驚人的併發量;
  • 若一個大操作伴隨著許多子操作, 例如維護平衡樹 (參考《【資料結構】樹》定義 13) 的平衡性, 使用互斥鎖來獨佔某部分資料通常更快. 這種情況下, 對應的無鎖資料結構的實作可能極其複雜, 效能不如基於鎖的版本, 程式碼不具有可維護性. 對於部分資料結構來說, 若不改變其原本的定義, 可能根本難以實作無鎖版本;
  • 在高度衝突下, 無鎖資料結構可能因為重試超多次 (不只是比較-交換操作, 隨便一個操作都可能被另一個正在進行的無鎖操作阻擋), 造成 cache line 反覆存取後又被移除. 這個時候基於鎖的實作能夠保證至少有一個執行緒能夠順利完成操作.

綜上所述, 在超高併發下或者低延遲場景下, 我們應該果斷選擇無鎖資料結構. 在一般場景下, 基於鎖的資料結構已經足夠應付了. 當然, 基於鎖的設計和無鎖設計也並不是排斥的. 若一個大型資料結構由若干小型資料結構組成, 我們可以在底層一些比較小的資料結構上使用無鎖, 在上層使用互斥鎖來保護. 當然, 也可以在底層使用互斥鎖, 在上層進行無鎖化. 這取決於具體的場景.

4.6 儘量使用開源無鎖資料結構

我們已經體會過實作一個無鎖資料結構有多困難. 無鎖資料結構作為軟體設計基礎程式庫的一部分, 其正確性和效能至關重要. 從另一個角度來說, 無鎖資料結構的維護也比較困難. 一般來說, 很多場景下所需要使用的資料結構都已經有前輩撰寫了開源版本. 這些開源的無鎖資料結構經過嚴格的測試, 並且可能已經被大範圍工業級應用, 其正確性和效能都有極佳的表現. 例如對於無鎖佇列, Boost, Intel 的 oneTBB 和 Facebook 的 Folly, 都有成熟的實作. 它們基本都基於固定大小的環狀陣列或者前向連結串列式進行改進. 不但有雙執行緒單讀單寫版本, 也有多執行緒多讀多寫版本, 還有佇列大小固定的版本. 另外, moodycamel::ConcurrentQueue 所使用的大佇列中包含若干固定大小的小佇列也是一個非常有意思的實作.

對於有序資料結構, Boost, oneTBB 和 Folly 都提供了 set 或者 map 這種封裝好的資料結構; 對於搜尋友好型資料結構, 它們也都提供了對應雜湊表版本的 unordered_setunordered_map. 這些足以覆蓋絕大多數併發程式設計場景.

5. 實作

我自己用 C++ 實作了兩個版本的佇列, 都支援多讀多寫 :

  • 固定大小的環狀無鎖佇列 :
  • 基於固定大小陣列的連結串列式無鎖佇列 :

閣下可以參考後自行嘗試實作.