摘要訊息 : 在 C++ 中馴服併發.
0. 前言
隨著電腦硬體的發展, CPU 從最早的低頻率單核心到高頻率單核心, 再到多核心. 在 C++ 11 之前, 在 C++ 中進行同時利用多個核心通常需要借助作業系統提供的 API. 在 C++ 11 引入多執行緒的支援後, 我們可以在任意平台下只透過 C++ 標準樣板程式庫便可以進行併發程式設計. Jonny'Blog 在此之前從未介紹和 C++ 有關的併發內容, 因為這並不屬於 C++ 中的基礎內容. 要想進入 C++ 的併發世界, 除了需要 C++ 的基礎知識之外, 還需要閣下了解作業系統的相關知識, 尤其是和 CPU 有關的相關知識.
1. 併發
併發 (concurrency) 指的是兩個或者多個獨立的活動在同一時刻一起發生, 例如在我們洗衣服時, 突然有人打了個電話. 與之對應的還有平行 (parallelism), 指的是同一時刻多個任務同時進行, 例如我們可以做到一邊開車一邊說話, 一邊走路一邊說話. 併發和平行的概念容易混淆. 併發主要是針對某個物件, 例如洗衣服時突然有人打了個電話過來, 主要針對的是人; 平行主要指的是任務, 例如一邊開車一邊說話, 著重點在於開車和說話. 在電腦科學中, 併發的對象一般都是資料, 平行的對象一般都是 CPU 所要負責的任務. 以前, 大多數電腦的 CPU 只有一個核心 (例如 Windows 98 時代), 因此在某個時刻僅能處理一個任務. 但是因為 CPU 運作任務的速度足夠快, 在多任務作業系統優秀的調度策略的幫助下, 電腦看起來可以多個任務併發處理. 一個支援多任務的作業系統會在短時間內在多個任務之間進行切換, 最終確保全部任務完成, 讓人看起來像是並行發生的. 廣義上, 我們仍然將這種情況稱為平行. 這些年, 由於 CPU 核心數量不斷增加, 使得多任務作業系統可以真正做到在同一時刻下處理多個任務.
單核心與多核心在處理任務上有所不同 . 為了看起來像併發, 理想情況下, 單核心 CPU 透過不停進行上下文切換 (context switch), 在指定的時刻下運作不同任務的不同切片. 而多核心 CPU 在任務足夠少的情況下可以無需進行上下文切換, 一個核心分配一個固定任務, 運作完再分配另一個任務即可. 在不同任務之間切換是需要消耗額外時間和空間的. 作業系統在切換任務時通常必須將當前任務的一些中間狀態 (例如當前任務的 CPU 狀態和指令位址等等) 儲存下來, 方便下一次可以直接從狀態中恢復運作. 上下文切換所需要的時間可能進一步造成任務的延遲.
實際情況下, 硬體所能夠提供的核心數量是有限的. 可以說, 即便是一個 CPU 有很多個核心數量, 但是總任務數量總是遠遠超過核心數量的. 只是絕大多數任務足夠快, 以至於人類無法感知到中間的停頓或者切換, 誤以為非常流暢. 在同一個時刻下, 一個作業系統下可能有成百上千個任務正在進行, 這些任務有些具有較高的優先級, 有些需要在這個時刻下等待某些資料, 因此並不是多核心 CPU 就不需要進行上下文切換. 一個應用程式的運作效能很大程度上取決於 CPU 有多少個核心. 只要作業系統足夠先進, CPU 的核心數量總是越多越好.
近些年來由於遊戲行業, 電腦視覺和人工智慧的發展, GPU 和 NPU 逐漸進入大眾的視野. 相比於 CPU 這樣的專家, GPU 更像是流水線工人, 在資料攤開且資料規模巨大的情況下, GPU 中的每個小核心都負責自己的一部分, 併發運算. 在人工智慧中, 模型資料精度通常比較低, 使用 GPU 可能因為精度問題功耗過大, 而 NPU 更像是專門為人工智慧任務優化過的 GPU, 只去處理低精度下的任務. 可以看到, GPU 和 NPU 內部本身就是並行設計的, 內部大量小核心將大任務進行拆分, 獨立運算.
想像一下兩個程式設計師在兩個辦公室內進行各自的工作, 他們之間的交流並沒有坐在一起方便, 但是他們不會因為同一個資源有衝突, 也不會被外界所打擾. 所以這種情況下, 工作效率能夠提高. 然而老闆要付出的代價就是每個程式設計師都要配備一樣的資源, 即便某些資源可以共享 (例如飲水機), 同時也需要裝修兩間辦公室. 若將他們放在同一個辦公室內, 老闆可以解約一些裝修成本和耗材成本, 但是兩個人的工作效率可能因為對方莫名的打擾存在一定降低, 同時也有機率在同一時間對某個辦公資源存在競爭. 這樣做另一個好處就是兩個程式設計師的交流會方便一些, 可以直接對話, 無需走到另一個人的辦公室敲門. 將辦公室進行影射, 我們便可以得到兩種基本的併發方式 : 多進程與單進程下多執行緒.
在多進程下, 各個進程獨享資源, 獨立分配到一個 CPU 核心處理任務, 若要進行進程間通信, 要透過中介 (例如信號, 資料介面, 檔案或者管道), 這比直接共享資料更慢且更複雜. 然而, 作業系統會為不同進程設定保護屏障, 防止資料被其它進程所修改, 因此多處理器單執行緒下的併發會更安全.
在單進程下, 進程可以向作業系統申請執行緒. 執行緒 (thread) 是作業系統內的最小調度單位, 它可以共享其它執行緒的資源, 並且佔用 CPU 一個核心. 所以並不是單進程下就不可以併發, 我們可以透過單進程多執行緒的方式進行併發. 單進程下多執行緒有點像多個輕量級的子進程, 每個執行緒相互獨立, 運作不同的指令. 儘管每個執行緒的局域變數是相互獨立的, 但是堆疊或者堆積上的資料是可以共享且直接可存取的. 直接存取資料方便了不同執行緒之間的交流, 但是作業系統不會因為執行緒之間相互獨立就為這些共享資料加以防護, 所以多執行緒下的併發安全需要程式設計師手動管理.
如何選擇併發的方式取決於具體的業務場景, 具體的軟體架構和具體的平台. 多進程的優勢是安全, 單進程多執行緒的優勢是效能更高. C++ 11 中引入的併發主要是針對單進程多執行緒的, 直到本文撰寫為止的 C++ 26, 都沒有引入多進程, 所以多進程一般要透過作業系統提供的 API 或者引入額外的程式庫才行.
1.1 任務的可劃分性
若一個大任務可以被拆分成多個子任務, 各個子任務之間可以相互獨立地運作, 這種場景便可以使用併發. 一般來說, 使用併發能夠提升程式的運作效能. 因此在使用併發之前, 首先要確定的便是任務能否被劃分. 例如一個 GUI 程式, 一邊處理使用者鍵盤的輸入, 一邊計算資料, 那麼我們可以使用兩個執行緒分別處理兩個不同的任務. 一個執行緒專門用於渲染 UI, 另一個執行緒專門進行資料運算. 這樣, 計算執行緒看起來便更像是一個後台任務.
一般而言, 執行緒的數量和電腦的 CPU 核心數量無關. 在最新版本的 Windows 作業系統上, 打開任務管理器, 我們可以看到幾千甚至上萬個執行緒 (這還是在 PC 上, 在伺服器上會更多), 然而現代電腦中的作業系統並沒有因為執行緒數量增多表現出明顯的效能下降, 這是因為絕大多數執行緒並不是每時每刻都在搶佔 CPU. 在絕大多數情況下, 絕大多數執行緒都處於休眠狀態.
一些演算法可能受益於並行計算. 例如分而治之演算法 (參考《分而治之演算法》), 其特點便是各個小問題可以被獨立運算, 且小問題中所使用的資料也是相互獨立的. 一個尤其經典的例子便是合併排序法 (參考 《【演算法】合併排序法 (Merge Sort)》), 這個演算法的運作效能能夠隨著並行程度的提高而提高. 我們稱這類演算法為過易並行 (embarrassingly parallel).
還要討論的一個問題是什麼時候不應該使用併發. 一般來說, 併發的程式碼複雜於非併發的程式碼, 且可能產生更多錯誤, 這些錯誤會更加隱晦. 除非效能提升足夠明顯, 在抵銷編寫複雜程式碼所使用的時間和維護所產生的額外成本之外, 還有明顯提升, 否則都不建議優先考慮併發. 因為執行緒或者進程的啟動都需要額外的時間, CPU 調度也需要付出額外的消耗, 並不是所有情況下都是併發程度越大越好. 另一方面, 電腦硬體所提供的資源也是有限的, 讓太多執行緒同時高強度工作可能因為 CPU 調度頻繁而造成程式甚至作業系統運作緩慢. 這裡有一種極端的可能 : 絕大多數時間都消耗在因為 CPU 調度產生的副作用上, 作業系統頻繁地在不同任務之間切換 CPU 的分配, 程式本身被分配到 CPU 極少.
綜合上面的討論, 增加執行緒的數量並不會讓程式的效能嚴格地提升為兩倍. 若程式運作的過程中, 存在太多的執行緒之間的切換, 使用了併發的程式很可能還不如不使用併發的程式.
1.2 C++ 98/03 中的併發
在最早的 C++ 98/03, 並不存在執行緒這種概念的存在, 也沒有定義記憶體排序 (memory order). 像 Windows 平台我們可能透過 Windows API 來實現併發, 在 Linux 系列作業系統中, 我們可能透過 POSIX 程式庫或者更高層的程式庫 (例如 Boost 或者 Qt) 來實現併發. 這些程式碼通常依賴於特定的編碼器或者平台, 沒有足夠的可攜性. 一旦更換編碼器或者作業系統, 一旦將程式放在另一台電腦上, 可能因為某些部分不相容造成程式無法運作.
C++ 11 引入多執行緒後, C++ 標準樣板程式庫統一了多執行緒程式的程式碼編寫方式, 透過統一的設施在不同平台上實現併發. 在切換平台後, 我們可能只需要重新對程式碼進行編碼即可. 另外, C++ 11 嚴格定義了記憶體排序. 這些特性的引入都大大提高了 C++ 併發程式碼的可攜性, 且編碼器有了更大的空間對程式碼進行最佳化. 於作業系統直接提供的多執行緒 API 相比, C++ 標準樣板程式庫引入的工具效能幾乎可以和它們保持持平. 在此基礎上, 伴隨著記憶體排序的函式定義, C++ 標準樣板程式庫引入的新設施可以直接控制位元或者位元組, 讓執行緒之間的同步以及變化的可見性更加明確. 這些在以前是需要借助組合語言才能做到的.
絕大多數場景中, 我們都應該使用來自 C++ 標準樣板程式庫的多執行緒設施, 即使有證據證明部分經過封裝的高級工具為了程式碼的簡潔性而犧牲了效能. 只要不是效能極其敏感處, 都不應該繞過 C++ 標準樣板程式庫. 完全手工編寫多執行緒程式碼帶來的額外時間, 維護成本和複雜度, 以及可能帶來的錯誤, 相比於 C++ 標準樣板程式庫那一點點效能損耗, 其實並不划算. 大多數情況下, 由 C++ 標準樣板程式庫造成的程式效能降低, 首先應該考慮的是程式碼的設計是否合理.
統一設施就意味著要犧牲一些東西. 除了可能犧牲效能之外, 還可能犧牲一些專有的平台特性. 不過幸運的是, C++ 標準樣板程式庫為部分類別提供了成員函式 native_handle
, 用於支援平台提供的專有特性.
1.3 來自執行緒的 Hello World!
我們最開始學習 C++ 的時候, 一般透過 std::cout
和 std::endl
輸出一個 "Hello World!". 事實上這也是由執行緒輸出的, 只不過這個執行緒是主執行緒, 和進程相互混淆, 因此我們並不會過多關注它. 現在我們開始學習 C++ 中的併發程式設計, 因此我們希望配置一個新的執行緒, 讓子執行緒來輸出 "Hello World!". 這個時候, 我們可以使用來自 C++ 標準樣板程式庫標頭檔 <thread>
中的 std::thread
, 傳遞一個要運作的函式給它即可 :
#include <iostream>
#include <thread>
// 將任務包裝為一個運作區塊
void print() {
std::cout << "Hello World!" << std::endl;
}
int main(int argc, char *argv[]) {
std::thread t(print); // 委託任務後立即啟動執行緒
t.join(); // 通知主執行緒等待子執行緒 t 運作結束
}
需要注意的是, 如果不呼叫 std::thread
的成員函式 join
, 主執行緒會在 main
函式終結後直接退出. 這可能導致執行緒 t
還沒開始它的任務就隨著主執行緒的退出而終結, 最終沒有輸出 "Hello World!". 沒這麼難吧~
2. 執行緒
在第 1.3 節中我們提到, 不論是否使用執行緒, 每個程式本身自帶一個主執行緒 (main thread), 由作業系統分配管理. 這個執行緒在程式啟動之後負責運作 main
函式. 在 main
函式中, 我們可以啟動負責其它任務的子執行緒, 這些執行緒在宣告完成後, 會隨著主執行緒一起併發地運作. 當 main
函式終結後, 主執行緒結束. 類似地, 當子執行緒負責的函式運作完成後, 子執行緒也會結束.
2.1 std::thread
std::thread
接受一個可呼叫物件, 用於指定該執行緒後續要進行的任務. 在最簡單的情況下, 其所接受的可呼叫物件沒有任何引數, 就像 Code 1 那樣. 需要注意的是, std::thread
會對可呼叫物件進行複製, 因此我們必須保證可呼叫物件的副本可以和原物件保持一致, 否則執行緒可能產生不符合我們預期的結果. 下面是 std::thread
用於接受可呼叫物件的建構子 :
template <typename F, typename ...Args>
thread(F, Args &&...);
根據宣告我們可以看出, 其所接受的可呼叫物件支援任意數量的引數. 需要注意的是, 若 F
是某一類別的名稱, 那麼 std::thread t(F())
這種宣告會被解釋為參數為函式型別 F ()
, 回傳型別為 std::thread
的函式宣告, 函式名稱為 t
. 因此這裡我們優先推薦列表初始化, 當然也有其它寫法 :
std::thread t(F {});
;std::thread t {F()};
;std::thread t {F {}};
;std::thread t((F()))
.
如果執行緒中有例外情況被擲出, 在單執行緒程式下, 程式會被 std::terminate
終結; 在多執行緒程式下也是類似, 不是擲出例外情況的執行緒被終結, 同樣是整個程式被 std::terminate
終結. 這樣設計也不無道理, 如果一個執行緒擲出例外情況, 那麼就說明該執行緒下可能有資料已經遭到破壞, 後續程式在被破壞的資料上運作可能會產生更加嚴重的錯誤. 與其這樣, 不如讓 std::terminate
終結程式, 方便偵錯.
2.2 join
與 detach
一旦一個執行緒宣告成功, 則其立即啟動. 若需要主執行緒等待子執行緒運作完成, 則我們需要像 Code 1 那樣呼叫 std::thread
的成員函式 join
. 若子執行緒運作沒有完成, 主執行緒又沒有呼叫成員函式 join
, 那麼在主執行緒結束之後, 會透過 std::terminate
強行終結子執行緒的運作. 因此即使有例外情況, 我們也應該保證主執行緒與子執行緒之間有正確的結合或者分離. 幸運的是, 這種結合或者分離的決定只要在 std::thread
物件被解構之前作出即可. 如果我們呼叫成員函式 join
, 那麼主執行緒的 main
函式在做完全部事情很久之後可能還在等待子執行緒運作完成, 它不能隨意退出. 若主執行緒無需等待, 但是我們又希望子執行緒在主執行緒結束之後仍然繼續保持運作, 直到任務完成為止, 我們可以呼叫 std::thread
的成員函式 detach
來分離主執行緒和子執行緒.
若我們已經決定不將主執行緒和子執行緒繫結到一起, 我們必須保證子執行緒中所存取的所有資料都是有效的, 直到子執行緒結束為止. 例如子執行緒持有了一個來自主執行緒的局域變數的參考或者指標 :
#include <iostream>
#include <thread>
void f(int &i) {
std::cout << i << std::endl;
}
int main(int argc, char *argv[]) {
int i {};
std::thread t(f, i);
t.detach(); // 分離主執行緒和子執行緒 t
}
主執行緒在分離之後, 局域變數 i
便會被回收. 若在 i
回收之後, 子執行緒存取它, 便會產生未定行為. 這就好比把一個可視範圍內的局域變數透過參考或者指標的方式傳遞到外面來使用, 這是一個 C++ 新手常犯的錯誤. 針對這種情況一般有以下解決方案 :
- 改變函式
f
的參數型別, 複製i
給函式f
, 讓 f 接受i
的副本而不是參考; - 將
i
變為靜態局域變數, 保證進程結束之前i
始終有效; - 透過動態記憶體配置, 為
i
配置專門的記憶體 (由於主執行緒的退出, 這個記憶體的回收需要子執行緒進行); - 讓主執行緒等待子執行緒
t
結束之後再結束, 也就是呼叫std::thread
的成員函式join
, 而不是detach
.
不同的場景選擇的方案也不盡相同. 第三種方案需要我們自行管理記憶體, 因此我們既要保證在 i
被使用之前記憶體不會被錯誤回收, 也要保證在 i
被使用完成之後記憶體被正確回收 (這句話是很輕飄飄吧, 看來是一句廢話是吧? 閣下將在之後的無鎖程式設計中見證它的威力, 參考《【併發 C++】無鎖程式設計》).
在實際應用中, 一般主執行緒不需要等待執行緒完成任務才退出, 因為子執行緒可能不止一個, 且子執行緒也可能因為等待使用者的輸入或者網路回傳的資料等原因一直在等待. 甚至, 子執行緒也可能等待其子子執行緒的結束. 之後, 我們也將看到有條件的等待或者只在特定時間內進行等待.
一個 std::thread
物件的成員函式 join
被呼叫之後, 其將不能再控制原本持有的執行緒, 其內部原本和執行緒有關的狀態都會被清空. 執行緒在運作完成後, 回收會委托給作業系統來完成. 這就好比一個 std::thread
的成員函式 join
被呼叫之後, 相當於呼叫了 std::unique_ptr
的 release
. 因此一個 std::thread
物件的成員函式 join
只在第一次有效, 另一個成員函式 joinable
也將在第一次呼叫 join
之後而持續回傳 false
(除非手動強行更改 std::thread
的狀態, 不過這目前不在我們的討論範圍內). 如果一個 std::thread
物件在被解構時, 成員函式 joinable
的回傳值仍然是 true
(也就是呼叫其成員函式 join
或者 detach
), 那麼 std::thread
的解構子將呼叫 std::terminate
主動終結程式.
例外情況下, 記憶體流失容易發生. 類似地, 在例外情況下, 一個 std::thread
物件的成員函式 join
或者 detach
也可能沒有被呼叫. 這時, 我們一般在捕獲例外情況之後, 在 catch
區塊中顯式地決定子執行緒的狀態 :
#include <thread>
void f();
void g() {
std::thread t {f};
try {
// ...
}catch(...) {
t.join();
throw;
}
// ...
}
當然, 我們也可以借助 RAII 來完成上面的動作 :
#include <thread>
#include <utility>
class thread_guard {
private:
std::thread &t; // std::thread 不可複製, 所以 thread_guard 的複製操作會被編碼器自動宣告為被刪除的函式
public:
explicit thread_guard(std::thread &t) noexcept : t {t} {}
~thread_guard() noexcept {
if(this->t.joinable()) {
this->t.join();
}
}
};
void f();
void g() {
std::thread t {f};
thread_guard guard {t};
// ...
}
在解構子中, 我們使用了 std::thread
的成員函式 joinable
來判斷, 這是因為一個 std::thread
物件的成員函式 join
只能被呼叫一次, 防止外面已經呼叫過 join
. 如果同一個執行緒呼叫了成員函式 join
兩次, join
會擲出 std::system_error
例外情況; 如果不同執行緒各自呼叫成員函式 join
超過一次, 那麼會產生未定行為.
一部分任務擁有即用即棄的特點, 例如將得到的資料進行處理後傳遞給伺服器, 執行緒不再保留這些資料; 有些任務擁有常駐程式的特點, 隨著程式的生命週期結束而結束, 例如專門負責監控資料是否出現異常數據的執行緒. 這些任務通常不需要 join
, 在保證資料安全的情況下直接呼叫成員函式 detach
即可. 與成員函式 join
類似, 一個 std::thread
物件的成員函式 detach
被成功呼叫後, 該物件將不再持有原執行緒, 我們也只能在成員函式 joinable
回傳 true
時才能對一個 std::thread
物件呼叫成員函式 detach
.
2.3 傳遞引數給 std::thread
前面的討論中, 我們傳遞給 std::thread
建構子的只有一個函式, 因為這些函式沒有參數. 如果傳遞給 std::thread
的函式有參數, 那麼函式的引數怎麼給, std::thread
的引數也是相同的給法, 緊接著傳遞的函式即可 :
#include <thread>
void f(int, char &, void *);
int main(int argc, char *argv[]) {
std::thread t(f, 42, **argv, argv);
t.detach();
}
若 std::thread
的任務是運作一個成員函式, 那麼第一個可呼叫物件傳遞的應該是成員函式指標, 第二個傳遞的應該是成員函式對應型別的物件, 第三個引數才開始傳遞對應成員函式的引數 :
#include <thread>
#include <iostream>
struct A {
int v;
A() = default;
A(const A &) {
std::cout << "copy constructor" << std::endl;
}
void f(int a, int b, int c) {
std::cout << this->v + a + b + c << std::endl;
}
};
int main(int argc, char *argv[]) {
A a {};
std::thread t(&A::f, a, 1, 2, 3);
t.join();
t = std::thread(&A::f, &a, 1, 2, 3);
t.join();
}
/*
* 輸出結果 :
* copy constructor
* 6
* 6
*/
在 Code 4-2 中, 我們看到如果直接將類別 A
的物件傳遞給 std::thread
, 會發生一次複製, 只有在傳遞指標的情況下才能避免複製. 如果一定需要傳遞參考, 可以改為 std::thread t(&A::f, std::ref(a), 1, 2, 3);
即可.
另外我們還需要注意, 如果可呼叫物件存在回傳值, 我們是無法從 std::thread
獲得這個回傳值的. 在這種情況下, 通常我們要以參考作為函式參數, 以此來做到模擬函式回傳值 :
#include <thread>
#include <cassert>
void f(int, char, float &result);
int main(int argc, char *argv[]) {
float f_result {};
std::thread t(f, argc, 'a', f_result);
t.join();
assert(f_result > 0.0f);
}
2.4 轉移執行緒所有權
std::thread
和 std::unique_ptr
擁有類似的所有權, 一個物件僅持有一個執行緒或者不持有一個執行緒. 與 std::unique_ptr
不同的是, 若一個 std::thread
已經持有一個執行緒, 其成員函式 joinable
回傳 true
, 我們再透過移動指派的方式指派一個執行緒給它, 那麼會導致程式直接呼叫 std::terminate
終止. 這麼設計是因為一個執行緒會獨佔一些資源或者正在共享一些資源, 若僅終結對應的執行緒可能導致資源無法正確被回收或者影響到其它執行緒, 甚至完全無法保證整個程式的邏輯正確性和完整性. 在第 2.2 節我們提到, 一個 std::thread
物件的成員函式 join
或者 detach
被呼叫, 才意味著這個物件不再與任何執行緒關聯. 因此, 我們要麼等待執行緒運作結束, 要麼直接呼叫其成員函式 join
或者 detach
, 之後就可以安全地對這個物件進行移動指派.
std::thread
支援移動操作的一個好處是我們可以讓 Code 3-2 中的 thread_guard
獲得一個執行緒的所有權. Code 3-2 中所持有的 std::thread
參考可能會在外部物件被回收之後產生未定行為, 導致我們付出一些額外時間排查, 現在我們透過移動來改進 Code 3-2 :
#include <thread>
class scoped_thread {
private:
std::thread t;
public:
explicit scoped_thread(std::thread &&t) : t {std::move(t)} {
if(not this->t.joinable()) {
throw std::logic_error("The thread is not joinable");
}
}
~scoped_thread() noexcept(false) {
this->t.join();
}
};
std::thread
可以移動但是不可以複製, 複製一個執行緒本身就是一個不合理的需求. 如果一個執行緒正在運作, 那麼複製執行緒是複製它現在的狀態繼續運作, 還是說重新開始運作? 另外, 記憶體共享也會因為執行緒的複製發生一些變化, 可能導致一些錯誤. 因此, 基於 std::thread
的輔助類別都不應該支援複製.
2.5 選擇執行緒的數量
std::thread
有一個靜態成員函式 hardware_concurrency
, 用於回傳至多可以在同一時刻運作多少執行緒, 一般來說這個值等同於 CPU 的核心數量. 不過這個值僅可用於參考. 如果電腦硬體的相關信息無法獲得, 那麼這個函式總是回傳 0
.
#include <thread>
#include <numeric>
#include <vector>
#include <iostream>
void sum(int *begin, int *end, long long &result) {
result = std::accumulate(begin, end, result);
}
int main(int argc, char *argv[]) {
int *arr {};
auto arr_size {256000}; // at least
// input from user
// fill the arr array
// ...
const auto thread_size {std::thread::hardware_concurrency()};
if(thread_size <= 1) {
long long result {};
sum(arr, arr + arr_size, result);
std::cout << result << std::endl;
}else {
const auto step {arr_size / thread_size};
std::vector<std::thread> threads {};
threads.reserve(thread_size);
std::vector<long long> results(step);
const auto block_size {arr_size / step};
for(auto i {0}; i < step; ++i) {
threads.emplace_back(sum, arr + i * block_size, arr + (i + 1) * block_size, std::ref(results[i]));
}
long long last_result {};
if(const auto calculated_size {block_size * step}; calculated_size not_eq arr_size) {
last_result = std::accumulate(arr + calculated_size, arr + arr_size, 0);
}
for(auto &thread : threads) {
thread.join();
}
std::cout << std::accumulate(results.cbegin(), results.cend(), 0) + last_result << std::endl;
}
}
Code 6 描述了一個使用多執行緒加速累加計算的實例. 在執行緒數量至少有兩個的情況下, 我們將陣列 arr
分塊, 將每一個區塊交給一個執行緒進行累加, 計算完成後將結果儲存在結果集合的某個變數上. 我們可以注意到三點 :
- 我們對每個執行緒都呼叫了
join
, 主執行緒會等待所有執行緒計算結束才會進行接下來的std::cout
輸出. 不過每一次for
迴圈呼叫一次join
, 都不會立即進入下一個迴圈, 而是等待當前執行緒結束. 不過幸好各個執行緒宣告後立即運作, 而且join
的呼叫不會影響其它執行緒運作. 所以基本上來說, 這裡的迴圈不會過度影響程式效能; - 各個執行緒有自己的儲存區域, 互不干擾;
- 若有多餘的數字, 我們直接交給了主執行緒累加, 而且這個動作在呼叫
std::thread::join
之前, 給足其它執行緒時間, 避免主執行緒在接下來的迴圈中過度等待.
我們並沒有隨意設定執行緒的數量, 而是嚴格根據 std::thread::hardware_concurrency
的回傳結果編寫多執行緒程式碼. 若我們設定的執行緒數量超過了 CPU 的核心數量, 就會帶來不必要的上下文切換, 這會降低程式效能. 如果我們設定的執行緒數量遠遠超過了 CPU 的核心數量, 那麼程式的主要時間都會花在執行緒之間的切換. 不過, 並不是我們嚴格遵循 std::thread::hardware_concurrency
的回傳值, 上下文切換就不會發生. 因為一個多任務作業系統中, 不會只運作一個進程, 其它進程也可能在同一時刻需要 CPU.
2.6 執行緒標識
每一個執行緒都會有自己的獨立標識, 稱為 thread ID, 即 TID. 在 C++ 中, 我們可以透過成員函式 std::thread::get_id
獲取當前執行緒的 ID. 不過回傳值型別並不是內建的數字型別, 而是 std::thread::id
型別. 若一個 std::thread
物件沒有與任何執行緒繫結, 那麼成員函式 get_id
的回傳值是預設建構的 std::thread::id
物件.
std::thread::id
型別的主要用處是用於比較. 若相等, 則說明兩個 std::thread::id
物件來源於同一個執行緒或者代表其繫結到了的 std::thread
都沒有持有任何執行緒. 對執行緒的排序也是借助 std::thread::id
進行的.
C++ 標準樣板程式庫中有一個名稱空間 std::this_thread
, 裡面的所有輔助函式可以幫助我們獲取當前執行緒的信息. 因為一個 std::thread
內部持有的執行緒是無法存取到自己對應的 std::thread
的物件的, 另外主執行緒一開始也沒有任何與其繫結的 std::thread
物件. 所以要獲取執行緒自己的信息, 必須透過 std::this_thread
中的函式. 要獲取當前執行緒的 ID, 那麼可以透過 std::this_thread::get_id
獲取.
執行緒的 ID 主要有以下作用 :
- 在主要任務相同的某幾個執行緒中, 在任務中為特定的 ID 設定一些其它執行緒沒有的特殊任務;
- 將特定的資料與指定的執行緒 ID 相關聯, 方便統一管理.
從本質上來說, 一個 std::thread::id
確實和數字沒什麼關係, 而且具體數字是什麼也沒有什麼意義. 儘管如此, 我們仍然可以透過 std::cout
來輸出一個 std::thread::id
物件的值, 這個輸出是一個具體的數值. 這個數值嚴格對應了活動監視器 (Linux 系) 或者任務管理器 (Windows) 中執行緒的 TID. 一般來說, 輸出這個值僅用於偵錯和日誌. 需要注意的是, 一個 std::thread::id
物件是無法透過強制型別轉化來轉換成一個內建數字型別的.
2.7 主動讓執行緒睡眠
有一部分執行緒可能會做一些等待的事情. 例如定時器, 每間隔 10 秒便啟動, 檢查一下是否收到了消息; 又或者在指定的時間啟動, 檢查一下網站是否有更新. 名稱空間 std::this_thread
中提供了函式 sleep_for
用於讓執行緒阻塞一段時間, 函式 sleep_until
用於讓執行緒一直阻塞到某個指定時間為止.
#include <thread>
#include <string>
#include <chrono>
std::string fetch_message();
std::string download_website(const char *URL);
bool stop();
void check_message() {
while(true) {
std::this_thread::sleep_for(std::chrono::seconds(1));
const auto message {fetch_message()};
// ...
if(stop()) {
break;
}
}
}
void check_website() {
constexpr auto URL {"https://www.abc.com/some_article.html"};
const auto old_content {download_website(URL)};
std::this_thread::sleep_until(std::chrono::system_clock::now() + std::chrono::days(1));
const auto new_content {download_website(URL)};
if(old_content not_eq new_content) {
// ...
}
}
如果一個執行緒在忙等, 那麼主動讓執行緒讓出 CPU 睡眠一段時間是合理的 :
#include <thread>
bool is_ready();
void f() {
while(not is_ready()) {
std::this_thread::yield();
}
// ...
}
和 sleep_for
或者 sleep_until
不同, yield
的睡眠時間是不固定的. 在絕大多數作業系統中, 透過 sleep_for
或者 sleep_until
的睡眠, 作業系統一般會在睡眠時間到達指定要求之後才將執行緒放入就緒佇列中, 等待喚醒. 然而透過 yield
睡眠的執行緒, 作業系統會將該執行緒立即放入就緒佇列中. 極端情況下, 這個執行緒可能剛被放入就緒佇列, 就又被喚醒了, 因為就緒佇列可能根本沒什麼執行緒在等待.
3. 在執行緒間共享資料
當我們生活在共享空間中, 有時候會遇到物品的共享衝突問題. 例如一個人想用烤箱製作蛋糕, 但是另一個人想在同一時間烤牛肉. 除非有一個人願意等待另一個人完成, 否則兩人都無法皆大歡喜. 又例如, 一個人正在為自己的手機充電, 另一個人問都不問直接將插電器拔掉之後自己使用.
使用執行緒的好處是各個獨立的執行緒之間可以直接共享資料, 上面的問題自然也會在多執行緒下遇到. 不正確的資料共享方式不僅會導致程式運作結果不符合預期, 甚至可能導致程式根本無法運作.
若有一些僅讀資料, 任何執行緒都無法更改它, 這樣的資料在執行緒之間共享就不會有任何問題. 因為所有執行緒對這些資料只有存取的權限, 沒有修改的權限. 即使多個執行緒在同一時刻存取同一資料, 或者在不同時刻多次存取同一資料, 其得到的結果永遠是固定不變的, 因而這個資料不會因為存取改變接下來的運作結果.
當允許執行緒修改資料, 我們就會面臨以下問題 :
- 多個執行緒在同一時刻修改同一資料, 誰先修改是無法由程式直接保證的, 因此我們無法預估最終會有什麼結果, 這種情況稱為競爭條件 (race condition);
- 當一個執行緒正在修改資料, 另一個執行緒恰好在同時需要存取該資料, 的到的資料可能是部分被修改, 另一部分還沒來得及修改的錯亂資料. 這種情況稱為資料不一致 (data inconsistency); 另一種稍好的情況是, 兩個執行緒同時修改一個資料, 其中一個執行緒修改的資料完全覆蓋了另一個執行緒所寫入的資料, 導致另一個執行緒寫入的資料完全沒有被任何執行緒看到. 這種情況稱為資料覆蓋 (data overwriting);
- 為了避免競爭, 執行緒可能會等待其它執行緒完成某些操作. 但是如果多個執行緒相互等待, 等待圖行程了一個環, 那麼整個程式都會陷入無限等待. 這種情況被稱為死鎖 (deadlock);
- 當多個執行緒嘗試解決當前存在的共享資料衝突, 而不斷嘗試讓步或者不斷改變自身狀態 (例如針對同一個變數, 兩個執行緒同時發現對方都想修改, 於是兩個執行緒同時等待, 然而明明有一個執行緒可以不用等待; 接下來, 兩個執行緒又準備修改, 又同時等待...), 從而導致真正的任務無法完成. 這種情況被稱為活鎖 (livelock);
- 當某個執行緒因長時間無法獲得運作所需要的資源而陷入無限等待, 從而導致該執行緒的任務無法完成. 這種情況被稱為資源饑餓 (resource starvation);
- 編碼器在對程式碼最佳化後, 可以導致部分變數的順序和程式碼運作順序發生變化, 從而導致執行緒存取資料的順序或者中間結果和預期不一致 (但是編碼器會保證程式最終結果和最佳化之前保持嚴格一致). 這種情況並非只在多執行緒下存在, 單執行緒下也有, 只是多執行緒表現得更明顯, 被稱為指令重排問題 (instruction reordering issue);
- 某些簡單的操作實際上可能分成多個步驟, 例如 ++x 需要先存取 x 的值, 將 x + 1 寫入, 並且回傳新值. 在執行緒之間交叉運作的時候, 可能會停在中間某一步. 若其它執行緒同時存取或者修改, 可能存取到不符合預期的結果. 這種情況被稱為非原子操作問題 (non-atomic operation issue);
- 某些變數可能在多個存儲裝置上存在, 例如某個變數在記憶體, 快取和暫存器中同時存在. 若某個執行緒修改了這個變數在暫存器上的值, 另一個執行緒從快取或者記憶體中存取到該變數的值, 那麼另一個執行緒所得到的值便不是最新的. 這種問題被稱為記憶體可見性問題 (memory visibility issue);
- 由於執行緒之間的運作順序完全由作業系統決定, 某些錯誤可能僅在特定條件下才出現, 復現極其困難, 偵錯難度大大提高.
上面列出了不少情況. 值得慶幸的是, 除了最後一條之外, 都可以可避免的, 不過代價就是程式碼會比單執行緒的程式複雜. 針對最後一條, 現代 IDE 也有很多方便的偵錯方式協助我們排查, ChatGPT 也可以幫助我們一起偵錯.
當一個執行緒正在修改資料, 另一個執行緒需要存取資料的時候, 我們需要確保另一執行緒的到的資料是修改前或者修改後的, 而不是修改進行到一半的值. 這一點可以從資料庫的交易 (transaction) 中獲得一些啟示. 資料庫所使用的交易也是解決多執行緒併發下, 保護共享資料的一種思想. 我們要求一個操作不能被中斷, 且不能直接修改原始值. 若一個執行緒正在進行交易, 沒有任何執行緒可以打斷它. 如果另一個執行緒也在進行交易, 這個交易涉及到共享變數的修改, 那麼另一個執行緒必須先轉返 (rollback), 等待第一個執行緒完成後重新提交 (commit). 在一個執行緒針對某些共享變數進行交易的時候, 其它執行緒也可以獲取這些共享變數的值, 但是獲取的必定是交易之前的值, 而不是交易修改到一半的值. 遺憾的是, C++ 目前還沒有引入交易, 必須編寫大量程式碼去模擬.
3.1 互斥鎖
C++ 標準樣板程式庫提供了一個用於保護資料的最基本設施, 互斥鎖 (mutual exclusion), 即 std::mutex
. 互斥鎖的意義在於告訴另一個執行緒當前執行緒已經進入一個臨界區 (critical section), 可能修改某些共享資料, 你必須等我完成. 在執行緒進入臨界區之前, 必須持有並獨佔互斥鎖, 以此鎖定臨界區的共享資料. 在執行緒完成任務後, 應該釋放互斥鎖, 以解鎖臨界區資料. 若執行緒嘗試獨佔互斥鎖失敗, 那麼說明由其它執行緒在當前正在使用共享資料, 需要等待.
互斥鎖的核心邏輯是, 當臨界區可以鎖定, 那麼立即鎖定, 並且緊接著運作接下來的任務; 如果臨界區不可鎖定, 那麼就開始等待其它執行緒解鎖臨界區, 一旦發現臨界區可以鎖定, 那麼立即鎖定並開始任務.
3.1.1 std::mutex
與 std::lock_guard
std::mutex
提供成員函式 lock
用於獲得互斥鎖的所有權, unlock
用於釋放互斥鎖的所有權, 即鎖定與解鎖. lock
與 unlock
必須成對出現, 否則可能造成死鎖. 為此, C++ 標準樣板程式庫為 std::mutex
提供了樣板類別 std::lock_guard
配套使用. 當建構一個 std::lock_guard<std::mutex>
物件後, 會自動嘗試鎖定, 當該物件解構之後, 會自動解鎖. std::mutex
和 std::lock_guard
都在標頭檔 <mutex>
中.
#include <mutex>
#include <iostream>
#include <thread>
int i {};
std::mutex i_mutex;
void f(int value) {
std::lock_guard<std::mutex> guard {i_mutex};
std::cout << "old value of i is " << i << "." << std::endl;
i = i * value + 1;
}
int main(int argc, char *argv[]) {
std::thread t1(f, 42), t2(f, -42), t3(f, 0);
t1.join();
t2.join();
t3.join();
}
我們也可以對 int
進行封裝, 讓它成為一個併發安全的 int
:
#include <mutex>
class thread_safe_int {
private:
int value {};
std::mutex mutex;
public:
void set(int value) {
std::lock_guard<std::mutex> guard(this->mutex);
this->value = value;
}
int get() {
std::lock_guard<std::mutex> guard(this->mutex);
return this->value;
}
};
對於 thread_safe_int::value
, 我們應該盡可能保護它不能被外界直接獲取. 一旦外界可以透過參考或者指標的方式指涉它, 那麼外界就可以直接繞過互斥鎖對 value
的值進行直接存取或者修改. 如非必要, 我們不應該為 thread_safe_int::value
的存取留有後門. 不過有時候, 我們也不是有意的 :
#include <mutex>
#include <iostream>
class thread_safe_int {
private:
int value {};
std::mutex mutex;
public:
void set(int value) {
std::lock_guard<std::mutex> guard(this->mutex);
this->value = value;
}
int get() {
std::lock_guard<std::mutex> guard(this->mutex);
return this->value;
}
template <typename F>
auto process(F f) -> decltype(f(this->value)) {
std::lock_guard<std::mutex> guard(this->mutex);
return f(this->value);
}
};
int &hook(int &value) noexcept {
return value;
}
int main(int argc, const char *argv[]) {
thread_safe_int i {};
auto &naked_i {i.process(hook)};
naked_i = 42;
std::cout << i.get() << std::endl; // 輸出結果 : 42
}
不過一般來說, 封裝屬於君子協定, 要想拿到受保護的臨界區變數, 總有一些非法的辦法 (例如對類別的記憶體位址直接進行偏移). 這也展現了 std::mutex
並非萬能. 某些即使是合法的情況下, 出乎意料的錯誤也不會因為 std::mutex
的引入而消失 :
#include <mutex>
class stack {
private:
int *elements;
std::size_t element_size;
std::mutex mutex;
public:
// ...
bool empty() {
std::lock_guard<std::mutex> guard(this->mutex);
return this->element_size == 0;
}
int &top() {
std::lock_guard<std::mutex> guard(this->mutex);
return this->elements[this->element_size - 1];
}
// ...
};
void f(stack &s) {
if(not s.empty()) {
auto top {s.top()};
// ...
}
}
Code 11 中的函式 f
看起來在多執行緒下沒有問題, 因為 stack
的成員函式 empty
和 pop
都有互斥鎖的保護. 但是考慮一種情況, 一個執行緒在運作函式 f
中的 if
之後, 發現堆疊不為空, 這個執行緒在此時讓出了 CPU; 另一個執行緒恰好在這個間隙將堆疊清空, 此時第一個執行緒再呼叫 top
, 就會發生未定行為. 有一種方案是在 Code 11 的成員函式 pop
中增加對堆疊是否為空的判斷. 若堆疊為空, 擲出例外情況; 或者將回傳值修改為智慧指標 std::shared_ptr
, 回傳空指標. 當然, 我們也可以合併 pop
和 top
的操作, 在 pop
完成後回傳堆疊頂部的元素. 一般來說, 想要在多執行緒下兼顧記憶體安全和例外安全, 一些額外的效能是不得不付出的. 因此絕大多數開源的 C++ 併發容器在這種情況下都會選擇合併 pop
和 top
, 同時將回傳值設定為 std::shared_ptr<T>
, 並且根本不提供 top
(理由是當時回傳的 top
可能在回傳之後便不再是 top
了, 這個 top
不能準確反映堆疊的情況). 這和講求絕對效能的 C++ 標準樣板程式庫容器完全是兩條路, 多執行緒的基礎程式庫設計就是不同於 C++ 標準樣板程式庫. 我們不推薦直接回傳未經過包裝的指標, 至少也要使用 std::unique_ptr<T>
.
若要宣告兩個 stack
物件, 那麼我們就需要管理兩個互斥鎖. 兩個及以上數量的互斥鎖會帶來另一個問題 : 死鎖. 死鎖指的是每一個執行緒都持有至少一個互斥鎖, 但是由於一個任務完成需要持有另外一些互斥鎖, 所以每個執行緒都在等待其它執行緒釋放所需的互斥鎖, 最終導致每一個執行緒都在空等, 無法繼續完成任務. 若每個執行緒都使用相同的順序持有互斥鎖, 那麼死鎖永遠不會發生. 對於互斥鎖 m_{1}, m_{2}, ..., m_{n}, 設執行緒 T1 鎖住了 m_{1}, m_{2}, ..., m_{i}, 執行緒 T2 想要持有 m_{1}, m_{5} 和 m_{n}. T2 想持有 m_{1} 時便被阻斷了, 根本沒有機會鎖住後面的互斥鎖. 因此 m_{i + 1}, m_{i + 2}, ..., m_{n} 都是未鎖定的. 若此時 m_{1}, m_{2}, ..., m_{j} 被解鎖, 但是 m_{j + 1}, m_{j + 2}, ..., m_{i} 未被解鎖, 且 j < 5, 那麼執行緒 T2 也沒有機會鎖定互斥鎖 m_{n}. 其中, i = 1, 2, ..., n, j = 1, 2, ..., i - 1.
實際情況會比我們想像的要複雜很多. 考慮交換兩個 std::stack
物件 :
#include <mutex>
#include <stack>
struct stack_with_mutex {
std::stack<int> stack;
std::mutex mutex;
};
void swap(stack_with_mutex &lhs, stack_with_mutex &rhs) {
std::lock_guard<std::mutex> g1(lhs.mutex), g2(rhs.mutex);
std::swap(lhs.stack, rhs.stack);
}
對於型別為 std::stack<int>
的物件 s1
和 s2
, 若執行緒 T1 想要呼叫 std::swap(s1, s2)
, 執行緒 T2 想要呼叫 std::swap(s2, s1)
, 現在就有了一種情況 : 執行緒 T1 的 lhs
為 s1
, 執行緒 T2 的 lhs
為 s2
. 那麼執行緒 T1 的呼叫的函式 swap
中鎖住了 s1
的互斥鎖, T2 呼叫的函式 swap
中鎖住了 s2
的互斥鎖, 接著執行緒 T1 在等待 T2 釋放 s2
的互斥鎖, 執行緒 T2 又在等待 T1 釋放 s1
的互斥鎖, 死鎖發生了!
3.1.2 std::lock
為了降低死鎖發生的風險, C++ 標準樣板程式庫標頭檔 <mutex>
中提供了一個函式 std::lock
, 它基於「若每個執行緒都使用相同的順序持有互斥鎖, 那麼死鎖永遠不會發生.」這一原則, 首先對接收到的互斥鎖按照記憶體位址進行排序, 然後按順序逐個鎖定. 若其中某個互斥鎖已經被鎖定, 那麼 std::lock
立即釋放所有剛剛被自己鎖定的互斥鎖, 並且嘗試暫停當前執行緒的運作, 讓出 CPU 資源給其它執行緒.
#include <mutex>
#include <stack>
struct stack_with_mutex {
std::stack<int> stack;
std::mutex mutex;
};
void swap(stack_with_mutex &lhs, stack_with_mutex &rhs) {
if(&lhs == &rhs) {
return;
}
std::lock(lhs.mutex, rhs.mutex);
std::lock_guard<std::mutex> g1(lhs.mutex, std::adopt_lock), g2(rhs.mutex, std::adopt_lock);
std::swap(lhs.stack, rhs.stack);
}
在 Code 12-2 中, 我們首先嘗試判斷了 lhs
和 rhs
是否為同一個物件, 完善了 Code 12-1 中的行為. 因為同一個對一個 std::mutex
物件在本執行緒未釋放的情況下再次呼叫成員函式 lock
屬於未定行為. 接著我們使用 std::lock
嘗試鎖定 lhs
和 rhs
的互斥鎖, 並且使用 std::lock_guard
幫助我們自動解鎖. 不過, g1
和 g2
相比於 Code 12-1 中多了一個引數, 即 std::adopt_lock
. 這個引數告訴 std::lock_guard
, 對應的 std::mutex
物件已經被鎖定過了, 之後只需要解鎖即可.
基於 std::lock
的屬性, 使用它鎖定的互斥鎖解鎖順序可以是任意的. 因為其它執行緒中的 std::lock
只要發現一個互斥鎖被其它執行緒持有, 那麼就會立即停止鎖定, 並且釋放已經鎖定的互斥鎖.
std::lock
不止可以處理兩個鎖, 它可以接收任意數量的互斥鎖, 其宣告為
template <typename Lockable1, typename Lockable2, typename ...Lockables>
void lock(Lockable1 &mutex1, Lockable2 &mutex2, Lockables &...mutexes);
有時候, std::lock
也不是萬能的. 如果我們沒有辦法一次性獲取到所有互斥鎖, 那麼就有可能需要分多次呼叫 std::lock
, 死鎖在這種情況還是無法完全避免.
3.1.3 基於互斥鎖的程式碼設計規範
死鎖並不一定來源於等待互斥鎖的釋放. 若有兩個 std::thread
物件 t1
和 t2
, t1
呼叫 t2
的成員函式 join
, t2
又呼叫 t1
的成員函式 join
, 兩個執行緒互相等待對方運作完成, 那麼這種等待永遠不會停止. 總的來說, 要避免死鎖, 核心就是要避免等待. 不過, 如果可以規範程式碼的設計, 也可以一定程度上避免等待 :
- 儘量避免一個執行緒持有多個互斥鎖, 若一定要持有多個互斥鎖, 儘量一次性使用
std::lock
降低死鎖的風險. 遵守這條雖然無法完全避免死鎖, 但是絕大多數情況下死鎖的發生和互斥鎖有關; - 在持有互斥鎖的時候, 呼叫其它函式應該儘量搞懂這個函式內部的實作是否可能和當前持有的互斥鎖衝突而產生死鎖. 若函式內部的實作無法看到, 最好的方式就是不要把任何互斥鎖或者可能和互斥鎖關聯的物件傳遞給該函式;
- 儘量為每個執行緒約定, 以某種固定的順序獲取互斥鎖. 例如在連結串列 (參考《【資料結構】連結串列》) A \leftrightarrows B \leftrightarrows C, 儘量以連結串列中的串連順序 A, B 與 C 逐個獲取, 當然前提是每個結點都需要提供一個互斥鎖;
- 為程式進行分層. 處於某一層次的程式碼不允許鎖定甚至獲取來自其它層次的互斥鎖. 一個簡單的程式通常有介面層, 邏輯處理層和資料層. 一般資料層處於最底層, 它不允許鎖定甚至獲取來自邏輯處理層和介面層的互斥鎖. 當然, 我們也可以對層次優先度進一步分類 : 資料層通常處於低層, 邏輯處理層一般處於中層, 介面層可能和使用者直接交互, 屬於高層. 一個符合一般邏輯的程式經常是高一些的層次可以獲取來自低一些層次的資料, 為此我們有時候可能需要放寬鬆一些, 讓高一些層次可以持有來自低一些層次的互斥鎖, 但是不允許低一些層次持有甚至獲取高一些層次的互斥鎖. 這樣, 互斥鎖之間便也有了非常鮮明的層次之分. 我們只要在同一層級之間再約定必須以某一些順序獲取互斥鎖, 就可以最大程度上防範因為互斥鎖帶來的死鎖. C++ 標準樣板程式庫中並沒有提供相應的設施, 但是我們可以自己實作. 比如利用
std::mutex
結合內建的數字型別用來表示當前的互斥鎖來源於哪個層級, 在嘗試鎖定的時候, 必須先檢查層級符不符合層級要求. 若層級不符合, 可以透過拋擲例外情況來提醒.
上面一些準則不僅僅在互斥鎖中適用, 針對執行緒也是適用的. 不過最好的方法是避免子執行緒等待另一個子執行緒, 只允許主執行緒等待子執行緒. 若實在無法做到, 也可以將上面的準則復用到執行緒上.
3.1.4 std::unique_lock
std::lock_guard
可以幫助我們自動處理互斥鎖的解鎖, 避免忘記或者例外情況, 然而它還是不夠靈活 :
#include <mutex>
void f(std::mutex &mutex) {
std::lock_guard<std::mutex> guard(mutex);
// 處理資料
// ...
// 進行一些與資料, 互斥鎖或者執行緒無關的耗時操作
// ...
// 依賴於上面的結果繼續處理資料
// ...
}
在 Code 13-1 的中間步驟中, 我們希望 guard
可以解鎖 mutex
, 讓出資源給其它執行緒避免其它執行緒繼續無效等待, 畢竟中間的耗時操作和資料無關. 也就是說, 中間那一段耗時操作將本來一個大臨界區分成了兩個小臨界區, 分別是耗時操作之前的臨界區和耗時操作之後的臨界區. 為此, C++ 標準樣板程式庫標頭檔 <mutex>
引入了一個更靈活的 std::unique_lock
, 其用法與 std::lock_guard
高度類似, 但是提供了成員函式 lock
與 unlock
用於臨時鎖定和解鎖. 這樣, Code 13-1 可以被改造為
#include <mutex>
void f(std::mutex &mutex) {
std::unique_lock<std::mutex> lock(mutex, std::defer_lock);
// 準備工作
// ...
lock.lock();
// 處理資料
// ...
lock.unlock();
// 進行一些與資料, 互斥鎖或者執行緒無關的耗時操作
// ...
lock.lock();
// 依賴於上面的結果繼續處理資料
// ...
}
Code 13-2 展示了 std::unique_lock
的靈活性. 在一開始, 我們透過傳遞引數 std::defer_lock
延後加鎖, 避免準備工作就獨佔互斥鎖. 當真正開始處理資料的時候, 我們才透過呼叫成員函式 lock
持有互斥鎖. 當資料處理完成進行其它無關操作時, 釋放互斥鎖讓其它執行緒在這個間隙有機會運作. 最後, 在等待完成後繼續加鎖, 完成剩餘處理, 解鎖工作交給解構子完成.
一般情況下, 我們都需要控制持有互斥鎖的時間, 僅對有必要的操作加鎖進行保護, 以最少的時間持有互斥鎖. 持有互斥鎖的時間越長, 就意味著死鎖的風險增加.
std::unique_lock
擁有的靈活性不止於此, 它類似於 std::unique_ptr
, 可被移動, 也就是我們可以在某個可視範圍內持有互斥鎖後, 再轉移互斥鎖的所有權. 若某個可視範圍內加鎖處理完臨界區資料後, 不知道其它地方會如何處理資料, 最簡單的方法就是將互斥鎖的所有權轉移到另一個地方, 讓另一個地方自己處理這個互斥鎖的所有權 :
#include <mutex>
std::unique_lock<std::mutex> preprocess(std::mutex &mutex) {
std::unique_lock<std::mutex> lock(mutex);
// ...
return lock;
}
void process(std::mutex &mutex) {
auto lock {preprocess(mutex)};
// ...
}
std::unique_lock
的靈活性是用空間和效能換來的. std::unique_lock
的解構子並不直接釋放互斥鎖, 而是檢查一個和互斥鎖狀態有關的變數之後, 才決定是否解鎖. 因此, std::unique_lock
的效能並不如 std::lock_guard
. 當使用 std::lock_guard
就可以滿足需求的時候, 儘量避免使用 std::unique_lock
.
3.1.5 一次性互斥鎖
如果一個物件需要進行一次初始化, 初始化之後就保證值不會再發生改變, 之後的存取操作都無需對其加以保護, 但是在多執行緒下初始化還需要保護 :
#include <mutex>
#include <memory>
std::shared_ptr<int> p {};
std::mutex p_init_mutex {};
int read_value(int value_for_first_init_only) {
std::unique_lock<std::mutex> lock(p_init_mutex);
if(not p) {
p = std::make_shared<int>(value_for_first_init_only);
}
lock.unlock();
return *p;
}
在 Code 15-1 中, 雖然 p
在之後使用的時候無需加鎖, 我們使用了 std::unique_lock
的靈活性避免了這點. 但是由於我們並不知道 p
什麼時候會被初始化, 初始化的時候有沒有可能存在競爭, 所以我們仍然使用 std::mutex
保護它. 然而初始化完成之後, 這個互斥鎖就不再有用, 但是它仍然在消耗一些時間. 對這種情況, C++ 標準樣板程式庫在標頭檔 <mutex>
中提供了配套的設施 std::call_once
和 std::one_flag
, 以替代這種場景下的互斥鎖 :
#include <mutex>
#include <memory>
std::shared_ptr<int> p {};
std::once_flag init_flag_for_p {};
void init_p(int value_for_first_init_only) {
p = std::make_shared<int>(value_for_first_init_only);
}
int read_value(int value_for_first_init_only) {
std::call_once(init_flag_for_p, init_p);
return *p;
}
std::call_once
的用法和 std::thread
很類似, 不過其第一個參數是 std::once_flag
型別, 接下來接受一個可呼叫物件, 後面的引數就是要傳遞給可呼叫物件的引數. std::once_flag
的效能優於 std::mutex
, 且和 std::mutex
一樣無法複製.
在 C++ 11 之前, 全域變數在多執行緒下的初始化屬於未定行為, 變數在不同執行緒下可能會被初始化多次. 為了避免這種情況, C++ 98/03 時代通常使用靜態局域變數的單例模式解決這個問題. 但是在 C++ 11 後, 編碼器會為每個局域變數生成類似於 std::call_once
的程式碼, 保證在不同執行緒下也只初始化一次.
3.1.6 可共享的互斥鎖
Code 15 基於變數初始化後直到程式終結都不會再改變, 但是也有這種情況 : 變數初始化之後極少改變, 絕大多數操作都是存取. 那麼為了相容極少改變那部分操作, 我們仍然需要引入互斥鎖. C++ 標準樣板程式庫標頭檔 <shared_mutex>
為這種情況專門提供了 std::shared_mutex
和 std::shared_lock
:
#include <ctime>
#include <shared_mutex>
#include <mutex>
class basic_time {
private:
std::time_t program_start_time;
mutable std::shared_mutex mutex;
public:
basic_time() : program_start_time {std::time(nullptr)} {}
basic_time(const basic_time &) = delete;
basic_time(basic_time &&) = delete;
~basic_time() = default;
public:
std::time_t read() const {
std::shared_lock<std::shared_mutex> lock(this->mutex);
return this->program_start_time;
}
void reset(std::time_t new_time) {
std::lock_guard<std::shared_mutex> guard(this->mutex);
this->program_start_time = new_time;
}
};
在成員函式 reset
中, 我們仍然使用 std::lock_guard
保證寫入的時候獨佔互斥鎖, 避免其它執行緒存取到錯誤的值. std::shared_mutex
中存在計數, std::lock_guard
會一直等待計數清零, 然後獨佔互斥鎖. 在 std::lock_guard
獨佔互斥鎖之後, std::shared_lock
和其它寫入操作都會等待本次寫入操作完成. std::shared_mutex
的獨佔操作和 std::mutex
類似, 使用成員函式 lock
和 unlock
, 這正好對應了 std::lock_guard
使用的部分; 其可共享的屬性由成員函式 lock_shared
和 unlock_shared
完成, 這也是 std::shared_lock
所使用的部分. 和 std::mutex
相同, 若一個 std::shared_mutex
已經被本執行緒透過呼叫成員函式 lock
持有, 在未釋放的情況下, 若本執行緒再次呼叫成員函式 lock
會產生未定行為.
要注意 std::mutex
, std::lock_guard
和 std::unique_lock
都是 C++ 11 引入的, 但是 std::shared_lock
是 C++ 14 引入的, std::shared_mutex
是 C++ 17 引入的.
3.1.7 嘗試持有互斥鎖
std::mutex
和 std::shared_mutex
除了成員函式 lock
和 unlock
之外, 還有一個名稱為 try_lock
的成員函式, 它只是嘗試持有互斥鎖, 如果嘗試不成功會回傳 false
, 不會像 lock
那樣一直等待. std::unique_lock
和 std::shared_lock
都有對應的建構子嘗試持有互斥鎖, 像傳遞 std::defer_lock
或者 std::adopt_lock
那樣傳遞 std::try_to_lock
即可. 另外, std::unique_lock
和 std::shared_lock
也有對應的成員函式 try_lock
, 呼叫繫結到的互斥鎖的成員函式 try_lock
.
try_lock
一般用在等待敏感的地方, 如果發現持有互斥鎖不成功, 那麼就先去做一些別的事情. 一般我們會透過其回傳值來判斷, 若回傳 false
, 那麼運作另一段程式碼. 在運作其它程式碼之後, 接下來可以重新嘗試持有互斥鎖, 也可以徹底不管它, 這取決於具體的程式邏輯. 它不像 lock
那樣, 當發現持有鎖不成功, 那麼就一直等待, 什麼都做不了.
不要被 try_lock
的名字所欺騙, 同一個執行緒在已經持有互斥鎖物件的情況下, 再次呼叫 try_lock
也會產生未定行為.
3.1.8 可遞迴的互斥鎖
我們已經知道, 同一個執行緒在持有 std::mutex
或者 std::shared_mutex
互斥鎖物件後, 再次呼叫其成員函式 lock
或者 try_lock
會產生未定行為. 但是確實會存在一些遞迴的情況, 我們沒有辦法獲知本次遞迴之前的函式呼叫中, 是否已經持有某個互斥鎖. C++ 標準樣板程式庫標頭檔 <mutex>
為這種情況提供了 std::recursive_mutex
(沒有 std::recursive_shared_mutex
). 同一個執行緒在獨佔 std::recursive_mutex
互斥鎖後, 再次呼叫其成員函式 lock
或者 unlock
是合法的, 不會產生未定行為. std::recursive_mutex
內部會對成員函式 lock
的呼叫進行計數. lock
呼叫多少次, 對應地, unlock
也必須呼叫多少次. 如果同一執行緒對 lock
和 unlock
的呼叫次數不對稱, 那麼會產生未定行為.
std::recursive_mutex
的成員函式 try_lock
行為稍有不同. 在呼叫成員函式 try_lock
後成功持有互斥鎖的情況下, 計數增加一; 在 try_lock
持有互斥鎖失敗回傳 false
的情況下, 計數不會增加.
除非必要, 我們並不推薦使用 std::recursive_mutex
. 如果程式中出現了該型別的物件, 通常可以直接考慮程式碼的設計是否存在問題. 一個 std::recursive_mutex
的使用場景是一個成員函式將某些工作交給了另一個成員函式來做 :
#include <mutex>
class A {
private:
int i;
std::recursive_mutex mutex;
public:
void f() {
std::lock_guard<std::recursive_mutex> lock(this->mutex);
// ...
}
void g() {
std::lock_guard<std::recursive_mutex> lock(this->mutex);
// ...
this->f();
// ...
}
};
然而 Code 17 的設計是可以改變的. 例如我們借助 std::unique_lock
的靈活性, 先將所有權暫時轉移給成員函式 f
, 然後由 f
轉移回到 g
上來, 這樣便可以避免使用 std::recursive_mutex
.
3.2 等待某一事件
若要讓一個執行緒等待另一個執行緒完成某個任務, 我們前面說過, 可以由一個執行緒呼叫另一個執行緒 std::thread
物件的 join
. 同時, 我們也說過, 這種方式容易造成執行緒級別的死鎖, 不建議隨意使用. 不過有時候確實存在這樣的需求, 例如執行緒 T1 要完成 A, B, C 和 D 四個任務, 執行緒 T2 需要等待 T1 完成 A 和 B 便可以繼續自己的任務, 剩下的任務和執行緒 T1 另外兩個任務 C 和 D 無關. 如果執行緒 T2 呼叫執行緒 T1 的 join
, 那麼執行緒 T2 必須等待 C 和 D 這兩個任務也完成, 才能繼續自己的任務, 這種等待是沒必要的.
3.2.1 條件變數
要讓一個執行緒等待另一個執行緒的部分任務, 我們可以為這些子任務設定 bool
變數, 若完成某個任務, 則將某個任務的 bool
變數設定為 true
. 執行緒必須定時地檢查 bool
變數, 以判斷接下來的任務是否可以繼續, 否則就繼續等待. 執行緒的不斷暫停是需要不斷進行上下文切換的, 如果讓執行緒不斷檢測, 也會浪費大量 CPU 資源. 最好的方式就是當想要的任務完成後, 一個執行緒通知另一個執行緒, 另一個執行緒理想情況下只進行一次上下文切換, 喚起後立即可以開始接下來的任務. C++ 標準樣板程式庫在標頭檔 <condition_variable>
中提供了這樣的條件變數 std::condition_variable
. 不像 std::lock_guard
或者 std::unique_lock
可以配合任何互斥鎖使用, 只要互斥鎖提供成員函式 lock
和 unlock
就可以了, std::condition_variable
只能配合 std::mutex
使用, 它會在恰當的時候喚起執行緒 :
#include <mutex>
#include <condition_variable>
#include <queue>
std::queue<int> queue;
std::mutex mutex;
std::condition_variable cv;
void produce(int value) {
{
std::lock_guard<std::mutex> guard(mutex);
queue.push(value);
}
cv.notify_one();
}
void produce(int *value, std::size_t count) {
{
std::lock_guard<std::mutex> guard(mutex);
for(std::size_t i {0}; i < count; ++i) {
queue.push(value[i]);
}
}
if(count > 1) {
cv.notify_all();
}else {
cv.notify_one();
}
}
void consume() {
std::unique_lock<std::mutex> lock(mutex);
cv.wait(lock, []() -> bool {
return not queue.empty();
});
auto value {queue.front()};
queue.pop();
lock.unlock();
// ...
}
函式 produce
負責生產資料並且插入佇列中, 如果 produce
只收到一個 int
變數, 那麼條件變數只通知一個暫停的執行緒, 如果 produce
一次性收到多個 int 變數, 那麼就會通知所有暫停的執行緒. std::condition_variable
的成員函式 wait
就是往下運作的條件, 如果條件不滿足, 那麼執行緒將被暫停, 而且互斥鎖將被釋放; 如果條件滿足, 那麼仍然繼續持有互斥鎖往下運作. 收到通知被喚起的執行緒首先由條件變數負責嘗試獨佔互斥鎖, 如果加鎖失敗, 那麼仍然繼續等待. 因此, 條件變數的條件實際上由自訂條件是否滿足以及是否能持有互斥鎖兩者共同構成.
從 std:condition_variable
的使用來看, 互斥鎖的使用需要有靈活性, std::lock_guard
無法滿足 std::condition_variable
的使用要求, 所以 Code 18 中使用了 std::unique_lock
.
若作業系統的資源非常充足, 那麼執行緒的喚醒可能不是由條件變數發起的, 作業系統可能自行喚醒了被暫停的執行緒. 不過我們無需擔心這種情況, 作業系統的虛假喚醒 (spurious wake) 也收到了條件變數的保護, 條件變數仍然會去檢測條件是否滿足, 不滿足就繼續暫停.
有了條件變數, 併發堆疊或者佇列的成員函式 pop
便有了另一種實作方式 : 若容器為空, 則等待成員函式 push
透過條件變數喚起自己繼續 pop
. 具體選擇條件變數還是選擇回傳空指標, 這取決於具體的場景.
std::condition_variable
只能搭配 std::mutex
, 有時候我們可能需要其它型別的互斥鎖. C++ 標準樣板程式庫標頭檔 <condition_variable>
中提供了另一個能接受任意互斥鎖型別的 std::condition_variable_any
. 其宣告方法和 std::lock_guard
類似, 要求這個互斥鎖具有成員函式 lock
與 unlock
, 用法和 std::condition_variable
相同. 然而由於 std::condition_variable_any
具有靈活性, 所以效能上不如 std::condition_variable
.
一般來說, 條件變數除了等待條件成立之外, 還需要等待互斥鎖. 不過, std::condition_variable
和 std::condition_variable_any
支援只等待互斥鎖, 這是成員函式 wait
的另一個多載版本, 其只接受一個 std::unique_lock<std::mutex> &
引數, 不需要也不能傳入其它條件.
3.2.2 停止執行緒
std::thread
沒有接入停止機制, 所以我們只能編寫更多程式碼, 告訴一個執行緒是否需要停止 :
#include <thread>
#include <chrono>
enum stop_token {
task_stop, task_continue
};
stop_token get_stop_token();
void send_stop_token(stop_token);
void save_statistical_data_to_database_after_stop();
void f();
int main(int argc, char *argv[]) {
for(auto i {0}; i < 7; ++i) {
std::thread timer([]() {
std::this_thread::sleep_for(std::chrono::hours(24));
send_stop_token(task_stop);
});
std::thread call_f([]() {
send_stop_token(task_continue);
do {
f();
}while(get_stop_token() == task_continue);
save_statistical_data_to_database_after_stop();
});
timer.join();
call_f.join();
}
}
在 Code 19-1 中, 停止來源 (stop source) 是透過 send_stop_token 發出的, 停止權杖 (stop token) 是函式 get_stop_token
. 綜合來說, 有一個執行緒正在執行任務, 另一個執行緒向它發出了停止請求, 執行緒透過請求停止權杖發現另一個執行緒要求自己正在執行的任務停止後. C++ 20 為簡化停止執行緒的流程, 在標頭檔 <thread>
中引入了 std::jthread
.
std::jthread
和 std::thread
幾乎有著一樣的用法, 但是 std::jthread
有一個成員函式 request_stop
來要求執行緒停止作業. 因此 std::jthread
的用法和 std::thread
的用法稍有不同, 如果要支援停止, 那麼其接受的函式物件的第一個參數必須是 std::stop_token
, 它告訴執行緒目前的停止權杖是什麼狀態 :
#include <thread>
#include <chrono>
void save_statistical_data_to_database_after_stop();
void f();
int main(int argc, char *argv[]) {
for(auto i {0}; i < 7; ++i) {
std::jthread call_f([](std::stop_token token) {
do {
f();
}while(not token.stop_requested());
save_statistical_data_to_database_after_stop();
});
std::thread timer([&call_f]() {
std::this_thread::sleep_for(std::chrono::hours(24));
call_f.request_stop();
});
timer.join();
}
}
在 Code 19-2 中, 我們借助了 std::jthread
和 std::stop_token
來優化 Code 19-1 中的程式碼. timer.join()
會讓當前的迴圈等待二十小時後, 向負責 call_f
的執行緒發出停止請求, 儲存分析資料到資料庫之後才進行下一個迴圈, 相當於每天進行一次分析資料的儲存. 接下來, 我們詳細分析一下 std::jthread
的停止來源和停止權杖.
3.2.2.1 std::stop_source
每一個 std::jthread
物件自身都帶有一個停止源, 其成員函式 get_stop_source
會獲得 std::jthread
自己自帶的停止來源, 其型別為 std::stop_source
. 我們也可以獲得一個 std::jthread
物件的停止源後, 透過該停止源發出停止請求, 並不一定非要呼叫 std::jthread::request_stop
. 因此, std::stop_source
類別也有一個成員函式 request_stop
. 事實上, std::jthread
的成員函式 request_stop
就是呼叫自身停止源 std::stop_source
物件的成員函式 request_stop
. 每一個 std::stop_source
物件內部都有可能存在一個停止狀態 (stop state), 這個停止狀態基本上包含了 :
- 停止旗幟;
- 參考計數;
- 已註冊的一系列停止回呼 (stop callback);
- 其它用於執行緒之間同步的內部信息.
有趣的是, std::stop_source
和 std::shared_ptr
類似, 其停止狀態是可共享的, 所以內部才需要參考計數. 當一個 std::stop_source
被移動時, 其內部停止狀態也會被移動, 那麼這個 std::stop_source
就不再持有任何停止狀態, 其成員函式 stop_possible
針對這種情況會回傳 false
. 當然, 一個預設建構和使用 std::nostopstate
作為建構子引數建構的 std::stop_source
也會讓成員函式 stop_possible
回傳 false
. 根據 std::stop_source
的內部結構, 呼叫 std::stop_source
的成員函式 request_stop
會讓停止狀態中的停止旗幟置為成立. std::stop_source
的另一個成員函式 stop_requested
可以查詢這個停止旗幟的狀態. std::stop_source
還支援交換操作.
3.2.2.2 std::stop_token
其實我們已經看到了用於 std::jthread
和 std::stop_source
停止的停止權杖, 即停止狀態中的停止旗幟. 停止旗幟並不是 bool
變數, 而是類別 std::stop_token
. std::stop_token
有兩個成員函式 stop_requested
和 stop_possible
. 也就是說, std::stop_token
其實也可以查詢停止源 std::stop_source
物件中的停止狀態. 因此, 停止狀態中的參考計數不僅包含了針對 std::stop_source
物件數量的計數, 也包含了對 std::stop_token
物件數量的計數.
我們可以透過 std::jthread
的成員函式 get_stop_token
或者 std::stop_source
的成員函式 get_token
來獲得其繫結的停止狀態對應的停止權杖. std::stop_source
和 std::stop_token
共享同一個停止狀態, 但是 std::stop_token
不能發起停止, 只能用於查詢停止旗幟.
3.2.2.3 std::stop_callback
在停止狀態中, 除了已經介紹過的停止旗幟, 參考計數還有內部私用的信息之外, 還有一個是停止回呼, 它用於註冊停止時應該做的一些事情. 例如我們可以要求在停止時輸出一些日誌 :
#include <thread>
#include <chrono>
enum print_level {
only_error, error_and_warning, debug, all
};
print_level get_log_level();
void save_statistical_data_to_database_after_stop();
void f();
int main(int argc, char *argv[]) {
for(auto i {0}; i < 7; ++i) {
std::jthread call_f([](std::stop_token token) {
do {
f();
}while(not token.stop_requested());
save_statistical_data_to_database_after_stop();
});
std::thread timer([&call_f]() {
std::this_thread::sleep_for(std::chrono::hours(24));
std::stop_callback callback(call_f.get_stop_token(), []() {
if(get_log_level() > error_and_warning) {
// 打印時間
// 打印 stop requested!
}
});
call_f.request_stop();
});
timer.join();
}
}
假設執行緒已經被暫停 (例如條件變數不滿足), 那麼只靠 std::jthread::request_stop
或者 std::stop_source::request_stop
可能並不能讓執行緒立即被停止, 執行緒可能仍然處於等待狀態. 此時, 我們可以借助 std::stop_callback
來喚醒執行緒 :
#include <thread>
#include <condition_variable>
#include <iostream>
std::mutex m;
std::condition_variable cv;
bool ready {};
int main() {
std::jthread t([](std::stop_token token) {
std::stop_callback callback(token, []() {
cv.notify_all();
});
std::unique_lock lock(m);
cv.wait(m, []() {
return ready;
});
if(token.stop_requested()) {
std::clog << "task cancelled.";
return;
}
// ...
});
}
也就是說, 當停止區塊中的停止旗幟變為成立, 那麼接下來就會呼叫所有停止回呼. 借助這個機制, 我們就可以喚醒正在等待的執行緒, 讓它們停止任務, 直接終結.
3.2.2.4 非共享的停止
std::stop_source
, std::stop_token
和 std::stop_callback
都被繫結到了一個共享的停止狀態上, 類似於 std::shared_ptr
的效能不如 std::unique_ptr
, 共享也會導致停止的效能在一定程度上降低. 如果我們明確了停止控制的來源只有一個, 那麼 C++ 26 提供了非共享的 std::inplace_stop_source
, std::inplace_stop_token
和 std::inplace_stop_callback
:
#include <thread>
#include <chrono>
enum print_level {
only_error, error_and_warning, debug, all
};
print_level get_log_level();
void save_statistical_data_to_database_after_stop();
void f();
int main(int argc, char *argv[]) {
std::inplace_stop_source unique_stop_source {};
for(auto i {0}; i < 7; ++i) {
std::thread call_f([token = unique_stop_source.get_token()]() {
do {
f();
}while(not token.stop_requested());
save_statistical_data_to_database_after_stop();
});
std::thread timer([&unique_stop_source]() {
std::this_thread::sleep_for(std::chrono::hours(24));
std::inplce_stop_callback callback(unique_stop_source.get_token(), []() {
if(get_log_level() > error_and_warning) {
// 打印時間
// 打印 stop requested!
}
});
unique_stop_source.request_stop();
});
timer.join();
call_f.join();
}
}
在 Code 22 中, 我們用回了 std::thread
, 它比 std::jthread
更加輕量級, 因而效能比 Code 20 稍好一些.
由於 std::inplace_stop_token
的引入, 一個範型程式可能需要透過型別萃取來判斷該使用哪一個停止回呼型別. 為此, C++ 26 同時引入了 std::stop_callback_for_t
, 自動幫我們匹配到對應的停止回呼型別. 那麼, std::stop_callback_for_t<std::stop_token>
對應的就是 std::stop_callback
, std::stop_callback_for_t<std::inplace_stop_token>
對應的便是 std::inplace_stop_callback
. 當然, 我們也可以自己打開名稱空間 std
, 定義自己的停止回呼型別.
3.2.2.5 Stop Concepts
事實上, 我們也可以自行實作停止來源, 停止權杖和停止回呼, 只要它們擁有和 std::stop_source
, std::stop_token
和 std::stop_callback
相同的介面即可, 這樣一個函式樣板便可以接受我們自己實作的物件 :
class my_stop_token;
template <typename StopToken>
void f(StopToken token) {
while(not token.stop_requested()) {
// ...
}
}
然而, 有時候我們並不希望函式 f
停止, 我們希望它永遠不能被停止, 那麼我們可以傳入 std::nerver_stop_token
: f(std::never_stop_token {});
. std::nerver_stop_token
是 C++ 23 引入的一個新的類別, 其成員函式 stop_requested
和 stop_possible
永遠回傳 false
, 而且被 constexpr
所標識.
為了加強型別約束, C++ 26 引入了兩個契約 : std::stoppable_token
和 std::unstoppable_token
, 這兩個契約可以幫助我們檢查一個類別是否具有和 std::stop_token
和 std::nerver_stop_token
類似的能力 :
#include <thread>
template <typename Token>
concept StoppableOrUnstoppableToken = std::stoppable_token<Token> or std::unstoppable_token<Token>;
template <StoppableOrUnstoppableToken Token>
void f(StopToken token) {
while(not token.stop_requested()) {
// ...
}
}
3.2.2.6 <stop_token>
上面我們在使用停止執行緒相關的 C++ 標準樣板程式庫設施的時候, 都是直接透過 #include <thread>
引入的. 但是實際上停止執行緒相關的設施有單獨的標頭檔 <stop_token>
, 包含停止執行緒相關的契約設施在內的所有設施被定義在標頭檔 <stop_token>
中. 因此 Code 24 中的 #include <jthread>
我們可以替換為 #include <stop_token>
.
另一方面, 有些標準樣板程式庫配合有些編碼器之後可能有一些奇怪的現象 : 當我們想要使用停止執行緒相關設施的時候, 僅僅引入標頭檔 <thread>
還不行, 編碼器可能提示和停止執行緒相關的設施只有宣告, 沒有真正定義. 此時, 我們只需要同時引入 <thread>
和 <stop_token>
這兩個標頭檔即可.
3.2.2.7 std::jthread
與 std::thread
std::jthread
除了上面那種接受 std::stop_token
的用法之外, 其實也可以像 std::thread
那樣去使用. 也就是說, 除了支援了停止機制之外, std::jthread
和 std::thread
幾乎完全相同. 相比於 std::thread
, std::jthread
更加安全. std::jthread
在解構的時候除了會呼叫成員函式 request_stop
, 還會檢查成員函式 joinable
的回傳值, 如果是 true
, 那麼 std::jthread
就會幫我們在解構子中呼叫成員函式 join
. 這樣, 我們即使是忘記了呼叫 std::jthread
物件的成員函式 join
或者 detach
, 也可以放心地交給 std::jthread
的解構子來完成, 不會因為忘記而導致程式呼叫 std::terminate
. 事實上, std::jthread
的全稱是 joining thread, 和停止機制沒什麼關係.
3.2.3 一次性事件
和 std::call_once
類似, 事件也有可能是一次性的, 這個任務在這次運作完成之後便不再觸及. 針對一次性的事件使用 std::condition_variable
雖然沒問題, 但是有些小題大作. 一個典型的例子是資料庫連線和 WebSocket 連線, 在網絡條件良好的情況下, 連線建立後便不需要再連線. 所以連線這個動作就是一次性的. C++ 標準樣板程式庫標頭檔 <future>
為這種情況引入了 std::async
, 其回傳一個 std::future<T>
物件. 其中, std::future
的樣板參數表示 std::async
所運作任務的回傳值, 可以是 void
. 要想獲得 std::async
任務的回傳值, 可以透過 std::future<T>
的成員函式 get
獲得 (需要注意這個函式沒有被 const
所標識) :
#include <future>
int operation(int &);
int f(int value) {
std::future<int> event {std::async(operation, std::ref(value))};
// ...
return event.get();
}
和條件變數不同, 若執行緒 T1 透過 std::async
啟動了另一個任務, T1 並不會因為這個任務沒有完成就一直等待, T1 可以接著運作接下來的程式碼, 直到需要任務的回傳值. 當執行緒 T1 透過 std::future
的成員函式 get
想要獲得回傳值的時候, 如果任務還沒有完成, 執行緒 T1 才會等待. 當然, 我們也可以透過呼叫 std::future
的另一個成員函式 wait
立即開始等待另一個任務完成, 暫停接下里的程式碼運作.
std::async
的調度策略取決於具體的實作. 預設情況下, std::async
有可能會啟動一個新的執行緒, 也有可能在當前執行緒里直接運作, 但是 std::async
並不會干擾當前執行緒的任務. 絕大多數情況下, 我們可以信任 std::async
的預設調度策略.
std::async
有兩個版本, 一個版本是 Code 25 中使用的版本, 直接像 std::thread
那樣接受一個可呼叫物件, 然後接收可呼叫物件對應的引數. 另一個版本可以由我們自行選擇是否啟動一個新的執行緒 :
#include <future>
int operation(int &);
int f(int value) {
auto event1 {std::async(std::launch::async, operation, std::ref(value))};
// ...
++value;
auto event2 {std::async(std::launch::deferred, operation, std::ref(value))};
// ...
++value;
auto event3 {std::async(std::launch::async | std::launch::deferred, operation, std::ref(value))};
return event1.get() bitand event2.get() bitor event3.get() + value;
}
在 Code 26 中, event1
將會運作在呼叫函式 f
的執行緒上, 保證不會另外開啟執行緒; event2
將會被延遲啟動, 直到 event1
對應的 std::future
成員函式 get
或者 wait
被呼叫才開始正式運作; event3
不但會運作在呼叫函式 f
的執行緒上, 而且會被延遲啟動.
std::future
確實是為一次性事件設計的, 其成員函式 get
只能被呼叫一次, 第二次呼叫屬於未定行為. 其成員函式 valid
可以用於檢測. 當 std::future
的狀態是準備好時, valid
會回傳 true
. 當其成員函式 get
被呼叫之後, valid
將會回傳 false
. 在成員函式 valid
回傳 false
之後, 再呼叫同一 std::future
物件的成員函式 get
或者 wait
都會產生未定行為.
3.2.4 多個一次性事件
一個瀏覽器和一個網站伺服器之間建立的 WebSocket 連線在網路狀態良好的情況下只有一次, 但是瀏覽器可以同時打開多個網頁, 和多個不同的伺服器建立 WebSocket 連線. 因此, 我們可能想到使用一個專門的執行緒來處理 WebSocket 的連線, 處理完成後將連線回傳給瀏覽器使用. std::async
並不能勝任這種任務, 因為它要指定某一個執行緒. 針對這種情況, 我們可以使用來自 C++ 標準樣板程式庫標頭檔 <future>
中的 std::packaged_task
. std::packaged_task
和 std::function
(參考《C++ 學習筆記》Code 52) 差不多, 接受一個函式型別, 表示可呼叫物件對應的函式型別. 如果要獲得 std::packaged_task
所運作任務的結果, 可以呼叫其成員函式 get_future
. 顯然, 並不是我們呼叫 get_future
就代表我們可以立即獲得 std::packaged_task
所運作任務的結果, 我們還是要透過 get_future
回傳值 std::future<R>
的成員函式 get
才能獲得結果. 其中, R
是 std::packaged_task
接受的可呼叫物件的回傳值.
#include <future>
#include <mutex>
#include <queue>
#include <condition_variable>
class websocket_connection;
struct connection_parameters;
std::shared_ptr<websocket_connection> create_websocket_connection(const connection_parameters &);
bool stop_tasks();
std::queue<std::pair<std::packaged_task<decltype(create_websocket_connection)>, connection_parameters>> connection_tasks {};
std::mutex mutex;
std::condition_variable cv;
std::future<std::shared_ptr<websocket_connection>> connect(connection_parameters param) {
std::packaged_task<decltype(create_websocket_connection)> connection_task(create_websocket_connection);
auto result {connection_task.get_future()};
{
std::unique_lock<std::mutex> lock(mutex);
connection_tasks.emplace(std::move(connection_task), std::move(param));
}
cv.notify_one();
return result;
}
void process_connections() {
std::unique_lock<std::mutex> lock(mutex);
while(true) {
cv.wait(lock, []() -> bool {
return not connection_tasks.empty();
});
auto task {std::move(connection_tasks.front())};
connection_tasks.pop();
lock.unlock();
task.first(task.second); // 丟棄這裡的回傳值, 因為值可以由 std::future::get 取出
if(stop_tasks()) {
std::lock_guard<std::mutex> guard(mutex);
decltype(connection_tasks) empty_queue {};
connection_tasks.swap(empty_queue);
return;
}
}
}
Code 27 看起來有點複雜, 只是樣板使用得有些多, 實際邏輯並不複雜. 首先, 我們宣告了一個佇列, 佇列內部元素的型別為 std::pair<std::packaged_task<...>, connection_paramters>
, 第一個表示任務所使用的函式, 第二個表示任務所使用的參數. 當然, 在 Code 27 中只有 create_websocket_connection
這一個函式符合佇列的要求. 在函式 connect
中, 我們將函式和對應的引數打包放入佇列中, 並且回傳一個 std::future<std::shared_ptr<websocket_connection>>
, 外界可以透過這個 std::future
獲取連線. 函式 connect
並不立即建立連線, 而是將連線的任務交給了一直在運作的函式 process_connections
(除非它接到了停止運作的信號, 信號由全域函式 stop_task
發出). 函式 process_connections
在有資源的情況下, 首先從佇列中拿出了一個任務, 並且運作這個任務.
當一個連線被放入佇列, 外界拿到了 std::future
物件, 但是這個任務還沒來得及被函式 process_connections
運作, 外界就呼叫了 std::future
的成員函式 get
或者 wait
. 我們無需擔心這種情況會導致異常, std::future
在共享狀態沒有準備好的情況下, 外界會一直被阻塞, 直到 process_connections
中完成連線任務. 過早呼叫 std::future::get
或者 std::future::wait
也不會讓任務提前於 process_connections
開始, 會一直阻塞到 process_connections
處理完任務之後.
值得注意的是, 有一種情況可能是比較特殊的. 如果 connection_tasks
中仍然存在任務, 但是函式 process_connections
接到停止信號, 直接清空 connection_tasks
佇列, 和這些被清空任務相關聯的 std::future
就會被標記為打破的狀態. 此時, 呼叫這些被解構任務關聯的 std::future::get
, 會擲出 std::future_error
例外情況; 呼叫 std::future::wait
, 不會有任何阻塞, 因為打破狀態也是一種準備完成的狀態.
3.2.5 帶有條件的一次性事件
網站伺服器很多時候並不會直接接受一個 WebSocket 連線, 伺服器可能要經過使用者校驗, 例如使用者的密碼是否正確. 那麼連線這個一次性事件便需要一個前提條件 : 使用者必須先輸入自己的密碼. C++ 標準樣板程式庫標頭檔 <future>
為這種情況提供了 std::promise
. std::promise
是一個樣板類別, 樣板參數只有一個, 是前提條件的型別. 例如在輸入密碼建立連線的實例中, 這個參數應該是字串型別. 當我們將前提條件準備好, 可以透過成員函式 set_value
將值傳遞給 std::promise
. 當然, 如果前提條件的獲取存在異常, 也可以透過成員函式 set_exception
將例外情況物件傳遞給 std::promise
. 和 std::packaged_task
類似, 它也有一個名稱為 get_future
的成員函式. 一個和 std::promise
關聯的 std::future
的成員函式 get
或者 wait
被呼叫時, 如果外界沒有透過 set_value
將前提條件準備好, 那麼 std::future
將在呼叫處阻塞, 一直等待前提條件的輸入.
#include <future>
#include <mutex>
#include <queue>
#include <condition_variable>
#include <iostream>
class websocket_connection;
struct connection_parameters {
std::string user_name;
// ...
};
std::shared_ptr<websocket_connection> create_websocket_connection(const connection_parameters &, const std::string &password);
bool stop_tasks();
std::string fetch_user_password(const connection_parameters &) noexcept(false);
void entropy_password(std::string &) noexcept;
std::queue<std::tuple<std::packaged_task<decltype(create_websocket_connection)>, connection_parameters, std::future<std::string>>> connection_tasks {};
std::mutex mutex;
std::condition_variable cv;
std::future<std::shared_ptr<websocket_connection>> connect(connection_parameters param) {
std::promise<std::string> password_promise {};
auto password_future {password_promise.get_future()};
std::thread([param](std::promise<std::string> promise) {
std::string password {};
try {
password = fetch_user_password(param);
}catch(...) {
promise.set_exception(std::current_exception());
return;
}
entropy_password(password);
promise.set_value(password);
}, std::move(password_promise)).detach();
std::packaged_task<decltype(create_websocket_connection)> connection_task(create_websocket_connection);
auto result {connection_task.get_future()};
{
std::lock_guard<std::mutex> guard(mutex);
connection_tasks.emplace(std::move(connection_task), std::move(param), std::move(password_future));
}
cv.notify_one();
return result;
}
void process_connections() {
std::unique_lock<std::mutex> lock(mutex);
while(true) {
cv.wait(lock, []() -> bool {
return not connection_tasks.empty();
});
auto task {std::move(connection_tasks.front())};
connection_tasks.pop();
lock.unlock();
enum {function, parameter, password};
try {
std::get<function>(task)(std::get<parameter>(task), std::get<password>(task).get());
}catch(std::runtime_error &e) {
std::cerr << "The user " << std::get<parameter>(task).user_name << "'s connection failed!" << std::endl;
}
if(stop_tasks()) {
std::lock_guard<std::mutex> guard(mutex);
decltype(connection_tasks) empty_queue {};
connection_tasks.swap(empty_queue);
return;
}
}
}
Code 28 中增加了兩個全域函式用於獲取使用者輸入的密碼和將密碼進行加密. 佇列中儲存的物件多了一個 std::future<std::string>
用於表示加密後的密碼. 在函式 connect
中我們首先宣告了一個 password_promise
用於取得前提條件, 接著獲取它的 std::future<std::string>
物件. 接著, 我們宣告了一個專門用於獲取使用者的密碼輸入並且將密碼加密的執行緒, 由於這個執行緒將脫離宣告的執行緒獨立運作, 所以所有物件都使用了移動或者複製的方式. 在執行緒中, 它將首先獲取使用者輸入的密碼. 函式 fetch_user_password
明確宣告了可能擲出例外情況, 因此我們將密碼的獲取放入了 try
-catch
區塊中. 當函式 fetch_user_password
擲出例外情況的時候, 它將被 catch
區塊捕獲, 例外情況透過 std::future
的成員函式 set_exception
儲存進物件中, 使得該 std::future
物件準備完畢後退出. 由於我們並不知道 fetch_user_password
具體會擲出什麼型別的例外情況, 所以這裡我們使用 C++ 標準樣板程式庫提供的 std::current_exception
來回傳指向當前例外情況的指標, 它是 C++ 11 引入的比較少用的智慧指標 std::exception_ptr
, 來自標頭檔 <exception>
. 當沒有例外情況, 密碼完成加密後, 被加密後的密碼會透過 std::future
的成員函式 set_value
儲存僅 std::future
物件使得該物件準備完畢.
3.2.6 多個執行緒等待一個事件
和 std::mutex
類似, std::future
也有可共享的版本. C++ 標準樣板程式庫標頭檔 <future>
中提供了 std::shared_future
. 對於同一個 std::shared_future
, 可以將其副本傳遞給每一個執行緒, 多個執行緒可以同時呼叫副本上的成員函式 get
或者 wait
, 每個執行緒都會的到 std::shared_future
的結果, 而且 std::shared_future
物件會一直有效, 一直儲存結果, get
可以呼叫無限次. 若且唯若它被移動之後, 才會使得其成員函式 valid
回傳 false
, 此時再呼叫成員函式 get
或者 wait
就會產生未定行為.
我們可以從 std::future
的成員函式 share
來建構一個 std::shared_future
物件, 也可以透過將一個 std::future
的右值物件 (參考《C++ 學習筆記》Code 46 及接下來的討論) 作為引數傳入 std::shared_future
的建構子來建構. 需要注意的是, 如果呼叫了一個 std::future
物件的成員函式 share
, 那麼接下來該物件內部的所有資料和狀態都會轉移到建構出來的 std::shared_future
上, 原來 std::future
物件不再有效, 成員函式 valid
回傳 false
.
#include <iostream>
#include <future>
using namespace std;
int main(int argc, char *argv[]) {
auto f {std::async([] {return 0;})};
auto sf {f.share()};
cout << f.valid() << endl; // 輸出 : 0
sf.get();
sf.get();
cout << sf.valid() << endl; // 輸出 : 1
f = std::async([] {return 0;});
cout << f.valid() << endl; // 輸出 : 1
f.get();
cout << f.valid() << endl; // 輸出 : 0
std::shared_future sf2 {std::async([] {return 0;})};
cout << sf2.valid() << endl; // 輸出 : 1
}
3.2.7 限制時間的等待
如果給每個互斥鎖的等待或者執行緒的等待都設定時間的上限, 如果超過這個時間, 等待會強行被結束. C++ 標準樣板程式庫標頭檔 <mutex>
中提供了支援等待的互斥鎖 std::timed_mutex
和 std::recursive_timed_mutex
, 其成員函式 try_lock_for
和 try_lock_until
支援傳入一個時間間隔或者時刻作為引數, 告訴互斥鎖應該等待多久, 或者等待到什麼時候. try_lock_for
和 try_lock_until
的回傳值都是 bool
型別, 如果回傳 true
代表規定的時間段內持有互斥鎖成功, 回傳 false
代表規定的時間段內持有互斥鎖失敗. 那麼我們可以根據回傳值做出不同的決定 :
#include <mutex>
#include <chrono>
#include <iostream>
void f(int &value, std::timed_mutex &m) {
if(m.try_lock_for(std::chrono::seconds(1))) {
value = 42;
// ...
}else {
std::clog << "timeout!" << std::endl;
}
}
std::condition_variable
和 std::condition_variable_any
也支援限制時間的等待, 由成員函式 wait_for
和 wait_until
負責. 這兩個成員函式的第一個參數型別時 std::unique_lock<std::mutex>
, 第二個參數是等待時間或者等待持續到的時刻. 另外, 其多載版本還支援傳入一個可呼叫物件, 表示額外的條件. wait_for
和 wait_until
的回傳值是 std::cv_status
型別, 其中 std::cv_status::timeout
表示超時, std::cv_status::no_timeout
表示未超時.
std::future
與 std::shared_future
也有成員函式 wait_for
和 wait_until
, 其回傳值是 std::future_status
. 其中, std::future_status::timeout
表示超時, std::future_status::ready
表示已經準備好結果, std::feature_status::deferred
表示運作被延遲, 還未開始.
3.3 等待多個事件
在第 3.2 節中, 我們基本都圍繞於執行緒需要等待某個事件討論的. 當一個執行緒需要等待多個事件的時候, 我們可以多次復用第 3.2 節中所討論的設施, 但是 C++ 20 也為我們引入了 std::latch
和 std::barrier
來輔助我們等待多個事件, 而無需重複編寫程式碼.
3.3.1 閂
在標頭檔 <latch>
中有一個類別為 std::latch
提供了執行緒門閂的功能, 它是一個基於計數的等待. 當一個 std::latch
物件被建構的時候, 它要求給出有多少個事件需要被等待. 每當有一個事件完成, 我們就讓 std::latch
減去對應的計數. 當 std::latch
計數降到零的時候, 便表明所有事件都已經完成, 可以基於已經完成的事件的結果運作接下來的任務 :
#include <thread>
#include <latch>
#include <vector>
int main(int argc, char *argv[]) {
constexpr auto event_number {12};
std::vector<std::thread> threads {};
threads.reserve(event_number);
std::latch latch {event_number};
for(auto i {0}; i < event_number; ++i) {
threads.emplace_back([&latch]() {
// ...
latch.count_down();
});
threads.back().detach();
}
latch.wait();
// ...
}
在 Code 31 中, 若 threads
中有一個執行緒未完成, 那麼 latch.wait()
之後的程式碼都不會被運作. 也就是說, std::latch
的成員函式 wait
只有在計數減到零的時候才放行. 另外, std::latch
還提供了 try_wait
, 它不會立即引起阻塞, 如果計數為零, 那麼回傳 true
; 否則, 回傳 false
. 當我們想要減少計數的同時等待, 那麼我們可以呼叫 std::latch
的成員函式 arrive_and_wait
, 其作用相當於同時呼叫成員函式 count_down
和 wait
.
std::latch
的計數是自訂的, 也就是成員函式 count_down
和 arrive_and_wait
可以每次計數減去多少. 預設情況下, 每次計數都是減一, 所以 Code 31 實際上使用了成員函式 count_down
的預設引數. 不過, std::latch
的計數最大值由實作定義, 我們可以透過 std::latch
的靜態成員函式 max
獲取當前環境下可用計數的最大值.
需要注意的是, std::latch
是一次性物件, 無法複製, 無法移動, 也無法重新被指派新狀態, 用完即棄.
3.3.2 屏障
假設有五個人一起吃火鍋, 總共有十二種不同的菜. 大家分工協作, 把菜煮熟之後, 每個人都吃完了才能繼續煮下一道菜. 這種情況下, 由於 std::latch
是不可復位的, 所以它無法直接滿足這種場景. 對於每個執行緒都運作完, 然後門閂開啟繼續接下來的綜合任務, 接著門閂需要復位等待下一輪類似的任務, C++ 20 為這種場景引入了 std::barrier
, 它被定義在標頭檔 <barrier>
中.
#include <thread>
#include <barrier>
#include <iostream>
#include <vector>
int main(int argc, char *argv[]) {
constexpr auto people {5};
constexpr auto dishes {12};
auto remaining_dishes {dishes}
std::barrier start_eating {people, [&remaining_dishes]() {
if(remaining_dishes-- > 0) {
std::cout << "everyone starts eating!" << std::endl;
}else {
std::cout << "check out!" << std::endl;
}
}};
for(auto i {0}; i < dishes; ++i) {
std::vector<std::thread> threads {};
threads.reserve(people);
for(auto j {0}; j < people; ++j) {
threads.emplace_back([i, &start_eating]() {
std::cout << "people " << j << " starts cooking " << i << '.' << std::endl;
std::this_thread::sleep_for(std::chrono::minutes(1));
start_eating.arrive_and_wait();
});
}
for(auto &thread : threads) {
thread.join();
}
}
}
在 Code 32 中, 我們首先定義了每個屏障放行之後該做的事情, 也就是 std::barrier
的第二個參數所對應的引數. 在一個階段結束之後 (每個人分到的菜都煮熟了), 那麼就進行一個階段性的綜合任務 (要麼開始吃, 要麼吃完結帳). 接下來, 總任務被分成若干個子任務 (十二道菜), 每個子任務又可以被若干個執行緒繼續劃分 (五個人分工一起煮). 對於每一個執行緒來說, 當它的任務完成, 就呼叫 std::barrier
的成員函式 arrive_and_wait
, 減去一次計數. 當最後一個執行緒完成之後, 它順帶呼叫階段結束回呼 (開始吃或者結帳). 值得注意的是, arrive_and_wait
並不支援自訂計數, 每次都是固定減去一.
另一種場景也很常見, 如果一個人吃到一半便被叫走, 後續不再繼續參與, 那麼總體吃的人數就要減去一. std::barrier
針對這種情況提供了另一個成員函式 arrive_and_drop
, 用於標記一個執行緒完成之後永久退出, 不再參與後續階段的任務. 當 arrive_and_drop
被呼叫, 那麼 std::barrier
從下一個階段開始, 總計數就會減去一. arrive_and_drop
也不支援自訂計數.
和 std::latch
類似, std::barrier
也提供成員函式 arrive
和 wait
用於更靈活的場景下. 其中, 成員函式 arrive
支援自訂計數 (預設情況下每次計數都減一), 並且回傳型別為 std::barrier::arrival_token
物件, 這個物件標記了這次任務結束之後計數應該減去多少. 如果我們需要等待本階段任務的完成, 我們就要將到達權杖作為函式引數傳遞給成員函式 wait
. 也就是說, 呼叫成員函式 arrive_and_wait
相當於呼叫成員函式 wait(arrive())
.
事實上, std::barrier
是一個類別樣板, 其樣板參數接受一個可呼叫物件的型別, 對應於 std::barrier
建構子第二個參數的型別. 不過有時候, 我們並不需要一個階段完成之後做什麼其它事情, 因此可以留空, 即 std::barrier<> b(count);
. 在 Code 32 中我們並沒有明確宣告樣板引數, 這是借助了 C++ 17 編碼器幫助我們推導樣板引數型別的能力 (參考《【C++ 17】為類別樣板推導樣板參數》), 亦即 std::barrier<> b(count);
等價於 std::barrier b(count);
.
std::barrier
雖然可以復位, 但是它也不可複製, 不可移動, 也無法被指派一個新狀態. 其計數機制也和 std::latch
一樣有上限, 由被 constexpr
標識的靜態成員函式 max
提供該上限值.
3.4 號誌
在第 3.1 節中, 我們透過互斥鎖來阻擋執行緒, 避免發生資料競爭. 如果現在有一個緩衝區, 它內部實際上準備了五個不同的緩衝區塊供五個執行緒同時寫入資料, 假設每個執行緒寫入之前都要透過互斥鎖獨佔整個緩衝區, 未免太過於浪費資源, 這個時候我們需要號誌的幫助. C++ 20 在標頭檔 <semaphore>
中引入了樣板類別 std::counting_semaphore
, 其樣板參數接受一個型別為 std::ptrdiff_t
的非型別引數, 用於標記號誌計數的上限.
std::counting_semaphore
的成員函式 release
用於釋放一個計數, acquire
用於請求獲得一個計數. 和等待一個事件完成或者獨佔一個互斥鎖類似, 使用成員函式 acquire
若無法獲得計數, 那麼執行緒就會陷入等待, 等待其它執行緒釋放計數. std::counting_semaphore
也提供成員函式 try_acquire
, try_acquire_for
和 try_acquire_until
用於非等待的請求或者限制時間的請求. 一般而言, C++ 標準樣板程式庫所提供的 std::counting_semaphore
的成員函式 acquire 並不是單純忙等, 而是經過優化的. 它會保證執行緒睡眠, 然後在恰當的時候喚醒, 所以我們無需擔心此處的 CPU 資源利用問題.
std::counting_semaphore
必須從一個指定的初始計數建構, 例如 std::counting_semaphore<5> semaphore(3);
. 需要注意的是, 如果主動提供初始計數, 那麼初始計數的值必須大於等於零且小於等於樣板引數的值. 一旦初始計數的值不滿足條件, 或者成員函式 release
和 acquire
的成對呼叫導致計數小於等於零或者大於樣板引數的值, 該 std::counting_semaphore
後續的行為都將變得不可預測, 從而產生未定行為.
#include <semaphore>
template <std::size_t N>
class buffer {
private:
// ...
public:
const char *get();
void write(const char *);
static constexpr std::size_t max() noexcept {
return N;
}
};
buffer<5> buffer_5; // 全空的情況下, 提供了五個緩衝區塊以供五個執行緒同時寫入資料
std::counting_semaphore<decltype(buffer_5)::max()> semaphore_empty_slots {decltype(buffer_5)::max()};
std::counting_semaphore<decltype(buffer_5)::max()> semaphore_data_slots {0};
const char *read() {
semaphore_data_slots.acquire(); // 等待緩衝區有資料
const auto result {buffer_5.get()};
semaphore_empty_slots.release();
return result;
}
void write(const char *data) {
semaphore_empty_slots.acquire(); // 等待緩衝區可寫入
buffer_5.write(data);
semaphore_data_slots.release();
}
std::counting_semaphore
和 std::mutex
類似, 不支援複製, 不支援移動, 也不支援被指派為一個新的狀態. 值得注意的是, std::counting_semaphore
被 constexpr
標識的靜態成員函式 max
仍然是回傳實作定義的計數上限值, 這個值保證不小於樣板引數, 但是不一定等價於樣板引數.
如果只有計數的最大值只有一, 那麼我們可以直接使用 std::binary_semaphore
, 它等價於 std::counting_semaphore<1>
.
4. native_handle
在第 1.2 節中我們提到, C++ 標準樣板程式庫的很多執行緒相關設施都有成員函式 native_handle
, 用於支援平台提供的專有特性. 事實上, C++ 標準樣板程式庫內部的併發設施都是基於底層併發程式庫包裝出來的. 例如在 UNIX 家族作業系統上, 基本上是基於 POSIX 包裝的; 在 Windows 作業系統上, 基本是基於 Windows API 包裝的. 底層併發程式庫可能會提供更多的靈活性, 所以我們有時候需要底層程式庫中的某些型別. 下表是基於 POSIX 和基於 Windows API 實作的 C++ 標準樣板程式庫物件對應的底層把手 :
C++ 標準樣板程式庫設施 | POSIX | Windows API |
---|---|---|
std::thread /std::jthread | pthread_t | HANDLE |
std::mutex / std::recursive_mutex / std::recursive_timed_mutex | pthread_mutex_t | CRITICAL_SECTION * |
std::shared_mutex | pthread_mutex_t 或者 pthread_rwlock_t (極少部分實作) | CRITICAL_SECTION * 或者 SRWLOCK * (較新版本的 Windows 作業系統) |
std::condition_variable | pthread_cond_t | PCONDITION_VARIABLE |
如果我們要調整執行緒的優先級, 顯然無法透過 std::thread
做到, 此時我們可以借助成員函式 native_handle
及對應的底層程式庫做到 :
#include <thread>
#if not defined(_WIN32)
#include <pthread.h>
#else
#include <windows.h>
#endif
void some_operation();
void adjust_thread_priority(std::thread &thread) {
#if not defined(_WIN32)
sched_param sp {.sched_priority = 99};
pthread_setschedparam(thread.native_handle(), SCHED_RR, &sp);
#else
SetThreadPriority(thread.native_handle(), THREAD_PRIORITY_HIGHEST);
#endif
}
int main() {
std::thread t(some_operation);
adjust_thread_priority(t);
t.join();
}
5. 基於互斥鎖的資料結構
要讓多個執行緒同時安全地存取同一個資料結構物件, 必須滿足 :
- 併發存取期間保證資料結構的絕對不變性;
- 保證資料結構在執行緒之間正確同步, 併發的改動不會破壞資料結構的正確性
二者之一. C++ 標準樣板程式庫追求極致的效能, 所以我們需要對類似的物件主動配一個互斥鎖. 當然, 我們自行實作資料結構的時候, 也可以讓資料結構自己帶有互斥鎖, 這樣使用者在使用的時候可以無需主動加鎖.
讓程式支援併發的本質是提高程式的運作效能, 因此每一次互斥操作都意味著部分程式碼要串列執行, 程式的併發程度要降低. 在設計支援併發的資料結構時, 我們必須盡可能減少保護的範圍, 只獨佔必要的部分, 限制互斥鎖的可視範圍, 盡力避免遞迴互斥鎖, 降低死鎖發生的可能性.
為了資料結構的併發安全, 一個要點便是在資料結構被修改的時候, 與該資料直接相關或者間接相關的任何狀態不能被其它執行緒存取. 資料結構所提供的內建操作應該盡可能完整且獨立, 而非零散的. 在 Code 11 中, 我們曾經說過最好合併堆疊或者佇列的 pop
和 top
/front
函式, 在移除元素的同時回傳這個元素, 而不是讓使用者分別存取. 另一方面, 如果有例外情況擲出, 資料結構應該保證例外安全.
在設計併發資料結構前, 我們要考慮清楚 : 資料結構的使用者應該受到什麼樣的限制? 一個物件在被建構前或者在被解構後, 不應該進行任何直接或者間接的存取. 只有一個執行緒可以對一個物件進行建構或者解構操作. 這兩個屬於硬性限制, 不遵守大概率會導致未定行為. 對於其它操作, 設計時我們應當首先考慮 : 若該操作被多個執行緒同時進行, 是否可以保證資料的完整性與安全性. 然後, 我們還需要考慮當使用者進行組合操作的時候, 多個執行緒交替運作的情況下, 是否可能產生一些不期望的狀態, 從而導致使用者使用的過程中產生未定行為. 真正編寫程式碼的時候, 我們需要考慮 :
- 互斥鎖的可視範圍是否合理, 某些操作是否可以不繫結互斥鎖, 在臨界區外也可以安全進行;
- 資料結構內部是否需要採用不同程度的互斥;
- 不同操作是否需要相同的保護;
- 在不影響語義的情況下, 能否透過簡單改動提高資料結構的併發程度.‘
歸根結底, 我們在設計的時候最好讓串列執行的操作最少化.
5.1 基於互斥鎖的堆疊
之前我們實作過基於互斥鎖的堆疊, 但是那些都是簡單實作. 現在, 我們基於 std::stack
和 std::mutex
實作一個較為正式的使用者無需加鎖的堆疊 :
#include <mutex>
#include <stack>
template <typename T>
class thread_safe_stack {
private:
std::stack<T> stack;
mutable std::mutex mutex;
public:
thread_safe_stack() = default;
thread_safe_stack(const thread_safe_stack &rhs) {
std::lock_guard<std::mutex> lock(rhs.mutex);
stack = rhs.stack;
}
thread_safe_stack(thread_safe_stack &&rhs) = delete;
~thread_safe_stack() noexcept = default;
public:
thread_safe_stack &operator=(const thread_safe_stack &rhs) {
if(this == &rhs) {
return *this;
}
std::lock(this->mutex, rhs.mutex);
std::lock_guard<std::mutex> this_guard(this->mutex, std::adopt_lock);
std::lock_guard<std::mutex> rhs_guard(rhs.mutex, std::adopt_lock);
this->stack = rhs.stack;
return *this;
}
thread_safe_stack &operator=(thread_safe_stack &&rhs) = delete;
public:
void push(T value) {
std::lock_guard<std::mutex> guard(mutex);
this->stack.emplace(std::move(value));
}
template <typename ...Args>
void emplace(Args &&...args) {
return this->push(T {std::forward<Args>(args)...});
}
std::shared_ptr<T> pop() {
std::shared_ptr<T> result {};
{
std::lock_guard<std::mutex> guard(mutex);
if(not this->stack.empty()) {
result = std::make_shared<T>(this->stack.top());
this->stack.pop();
}
}
return result;
}
bool unsafe_empty() const {
std::lock_guard<std::mutex> guard(mutex);
return this->stack.empty();
}
std::size_t unsafe_size() const {
std::lock_guard<std::mutex> guard(mutex);
return this->stack.size();
}
};
Code 35 呈現了一個考慮較為周到的併發堆疊. thread_safe_stack
的複製建構子將 rhs
的互斥鎖獨佔, 然後才開始複製 rhs.stack
, 複製指派運算子透過 std::lock
按照順序鎖定互斥鎖, 然後開始複製 rhs.stack
. thread_safe_stack
的移動操作都被宣告為被刪除的函式, 因為如果在移動的過程中擲出例外情況, 可能會破壞 rhs
中的完整性 (當然, std::stack
保證移動操作不擲出例外情況, 所以還是可以像複製操作那樣實作, 如果用的是其它堆疊就不再保證了). thread_safe_stack
的重點是成員函式 push
和 pop
.
成員函式 push
有兩個版本, 其中一個版本接受型別為 T
的物件副本, 另一個名稱為 emplace
的函式採用原地建構. 如果閣下實作過單執行緒下容器的 emplace
函式或者看過類似的實作, 可能會知道 : 在 C++ 11 之後, 通常都會將 push
的工作直接委托給 emplace
來完成. 然而在這裡, 恰好反了過來. 這是因為如果接受一個 const T &
物件, 那麼物件複製的過程中可能要獨佔物件中可能存在的互斥鎖, 一旦 T
設計得有問題, 那麼可能會導致死鎖. 由於堆疊屬於比較基礎的資料結構, 所以一般被放在基礎程式庫中, 那麼死鎖就會發生在基礎程式庫中. 在實際程式設計中, 基礎程式庫作者通常要避免死鎖發生在基礎程式庫中, 所以這裡我們採用了更加安全的複製. 一旦複製發生死鎖, 那麼根本沒有機會呼叫成員函式 push
程式便無法繼續運作了. 對於成員函式 emplace
, 它很可能無法避免死鎖的發生, 但是它本身不獨佔任何互斥鎖. 因此死鎖如果發生在 emplace
中, 那麼我們可以確定原地建構部分出現了問題, 這仍然可以告訴使用者你的類別設計可能存在問題.
成員函式 pop
使用了第 3.1.1 小節討論的合併 pop
和 top
的建議. 不過我們盡力縮小了互斥鎖鎖定的範圍. 對於 std::shared_ptr
的建構和回傳都沒有獨佔互斥鎖, 因為它和任何成員變數都無關. 在互斥鎖獨佔後, 我們首先判斷了堆疊是否為空, 在不為空的情況下取出 top
, 同時呼叫 std::stack::pop
. 在這裡我們不需要判斷之後, 內部堆疊的成員函式 pop
被呼叫. 因為在一個執行緒獨佔互斥鎖後, 沒有任何其它執行緒可以存取到 this->stack
.
最後我們還實作了 unsafe_size
和 unsafe_empty
. 雖然在呼叫 unsafe_size
或者 unsafe_empty
並且釋放互斥鎖之後, 其它執行緒可能在此時改變堆疊的狀態, 導致呼叫 unsafe_size
或者 unsafe_empty
的程式碼處無法判斷回傳值和堆疊當前的狀態是否相符, 但是由於名稱中明確了這兩個函式是不安全的, 已經盡到了告知的義務.
現在讓我們來考慮例外安全問題. Code 35 中沒有任何除了解構子之外的成員函式被標識為 noexcept
, 因為成員函式 push
本身可能配置記憶體, 成員函式 pop
中的獨佔互斥鎖操作和 std::make_shared
都可能擲出例外情況. 我們暫時不考慮獨佔互斥鎖的例外情況, 因為按照上面的設計, 一個執行緒如果獨佔了互斥鎖, 不可能再獨佔一次. 下一次獨佔互斥鎖操作之前, 一定會透過 std::lock_guard
釋放上一次的互斥鎖. 如果成員函式 push
中擲出例外情況, std::stack
可以保證容器本身狀態不變, 我們無需擔心; 如果成員函式 pop
中擲出例外情況, 就只有 std::make_shared
處才可能, 這裡我們還沒有呼叫 this->stack.pop
, 並且只是對頂部元素進行了複製. 這就是為什麼 std::make_shared
明明可以移動 this->stack.top()
而沒有這麼做的原因. 這裡擲出例外情況的話, 移動操作可能破壞原堆疊的資料完整性. 綜上所述, Code 35 並不存在例外安全問題. 如果實在要避免, 我們可以把 thread_safe_stack
中的成員變數 stack
的型別更換為 std::stack<std::shared_ptr<T>>
, std::shared_ptr
保證複製操作不擲出任何例外情況.
不過, Code 35 沒有考慮阻塞式移除, 即只有等到佇列有元素才開始移除操作, 否則一直等待. 如果要支援阻塞式移除, 那麼執行緒無法獨佔互斥鎖就只能陷入未知的等待當中, 雖然作業系統會喚醒它, 但是時機並不明確. 此時, 執行緒可能陷入不斷嘗試獨佔互斥鎖但失敗, 又或者一直有高優先級的任務導致作業系統長期不喚醒. 為了解決這個問題, 我們可以在類別中增加一個條件變數, 在成員函式 push
終結之前透過 std::condition_variable::notify_one
來喚起一個執行緒.
雖然我們已經盡力避免死鎖發生在堆疊內部, 然而一旦使用者設計不當, 死鎖仍然可能在堆疊內部發生 :
template <typename T>
class thread_safe_stack {
//...
};
class object {
private:
std::mutex mutex;
// other member variables
// ...
public:
object();
object(const object &rhs) {
std::lock_guard<std::mutex> guard(this->mutex);
// ...
}
// ...
public:
void f(thread_safe_stack<object> &stack) {
std::lock_guard<std::mutex> guard(this->mutex);
if(stack.empty()) {
// operation based other member variables
// ...
}
// ...
}
};
thread_safe_stack<object> object_stack;
object o;
void thread_A() {
object_stack.push(o);
}
void thread_B() {
o.f(object_stack);
}
在類別 object
中, 複製和成員函式 f
都需要獨佔互斥鎖. 現在假設執行緒 A 運作函式 thread_A
, 執行緒 B 運作函式 thread_B
. 執行緒 A 首先要獨佔 object_stack.mutex
, 然後還需要獨佔 o.mutex
. 另一方面, 如果執行緒 B 想要呼叫 object::f
, 必須先獨佔 o.mutex
, 然後再獨佔 object_stack.mutex
. 假設執行緒 A 已經成功獨佔了 o.mutex
, 還沒來得及獨佔 o.mutex
, 執行緒 B 此時搶先獨佔了 o.mutex
. 這個時候, 執行緒 A 要等待執行緒 B 釋放 o.mutex
, 執行緒 B 又要等待執行緒 A 釋放 object_stack.mutex
. 兩個執行緒相互等待, 死鎖便發生了. 這種設計上本身就存在相互依賴的關係, 發生死鎖的機率特別高. 在併發程式設計下, 我們應該減少甚至避免變數之間的相互依賴.
有一些程式庫可能仍然為容器提供類似於 void push(const T &)
這種介面, 但是會在文檔中明確告訴使用者, 要注意 T
的設計, 避免死鎖.
5.2 基於互斥鎖的佇列
Code 35 其實相當簡單, 這也是將資料結構視為一個整體的最大好處. 在任何過程中, 我們都只需要一個互斥鎖的幫助 (如果有條件變數, 也是一個). 將資料結構視為一個整體意味著一旦用到了這個資料結構, 那麼對應地方的程式碼要是要串列執行的. 現在讓我們來考慮佇列. 佇列的插入和移除是在兩頭進行的, 如果將整個佇列視為一個整體, 只使用一個互斥鎖加以保護, 那麼就意味著插入和移除操作是無法同時進行的. 我們應該盡一切可能提高程式的併發程度, 所以我們不再考慮這種方案.
現在我們需要為佇列選擇一個底層的資料結構, 提高佇列的併發程度. 如果選擇陣列作為底層, 每次插入和移除都是需要鎖住整個佇列的, 因為存在擴容重新配置的情況, 而前向連結串列並沒有這個限制. 現在讓我們嘗試一下前向連結串列作為佇列的底層資料結構.
#include <memory>
#include <mutex>
template <typename T>
class thread_safe_queue {
private:
struct node {
std::unique_ptr<node> next;
std::shared_ptr<T> value;
};
private:
std::unique_ptr<node> head;
node *tail;
std::mutex head_mutex, tail_mutex;
public:
thread_safe_queue() : head {new node {}}, tail {this->head.get()} {}
~thread_safe_queue() noexcept = default;
public:
void push(T value) {
auto new_node {std::make_unique<node>(nullptr, std::make_shared<T>(std::move(value)))};
const auto new_node_pointer {new_node.get()};
std::lock_guard<std::mutex> guard(this->tail_mutex);
this->tail->next = std::move(new_node);
this->tail = new_node_pointer;
}
std::shared_ptr<T> pop() {
std::shared_ptr<T> result {};
{
std::lock(this->head_mutex, this->tail_mutex);
std::lock_guard<std::mutex> head_guard(this->head_mutex, std::adopt_lock);
std::unique_lock<std::mutex> tail_lock(this->tail_mutex, std::adopt_lock);
if(this->head.get() not_eq this->tail) {
this->tail_lock.unlock();
result.swap(this->head->next->value);
this->head = std::move(this->head->next);
}
}
return result;
}
};
在 Code 27 的設計中, 佇列的頭部結點總是空的, 每次移除都會從頭部結點的下一個結點拿出. 我們大量採用了來自 C++ 標準樣板程式庫的智慧指標幫助我們自動管理記憶體, 防止出現記憶體流失. 當一個結點對應的 std::unique_ptr<node>
發生解構, node
內部的 std::unique_ptr<node>
也會發生解構, 在鏈式解構的幫助下, 整個前向連結串列都會被解構. 尾部結點直接使用原始的指標, 因為它必定是某個結點的 next
, 記憶體已經交給了 std::unique_ptr
管理. 每個結點內部元素都使用了 std::shared_ptr
來儲存, 這是為了避免在 pop
時分配記憶體產生例外情況.
成員函式 push
只在尾部進行插入, 所以只需要獨佔互斥鎖 this->tail_mutex
即可. 成員函式 pop
需要判斷佇列是否為空, 必須同時存取 this->head
和 this->tail
, 所以那一刻需要獨佔 this->head_mutex
和 this->tail_mutex
. 在判斷完成之後, 由於移除只在頭部進行, 所以我們立刻釋放 this->tail_mutex
.
現在考慮併發的情況. 若一個執行緒正在 push
, 一個執行緒正在 pop
. 若正在 push
的執行緒獨佔了 this->tail_mutex
, 那麼進行 pop
的執行緒會卡在佇列為空的判斷之前等待. 若正在 pop
的執行緒獨佔了 this->head_mutex
, 並不會對正在 push
的執行緒產生影響, 接下來就要看誰先獨佔 this->tail_mutex
, 必定有一個執行緒要等待另一個執行緒, 也不會產生什麼問題. 最極端的情況便是佇列為空, 此時 this->head
和 this->tail
是指向同一個結點的. 當 pop
先行, 它若先獨佔了 this->head_mutex
, 還沒來得及獨佔 this->tail_mutex
, 負責 push
的執行緒獨佔了 this->tail_mutex
. 此時, 負責 pop
的執行緒只能等待. 當 push
成功之後, 負責 pop
的執行緒立刻可以從佇列中取出元素, 也沒什麼問題.
5.3 基於互斥鎖的複雜資料結構
堆疊和佇列都是比較簡單的資料結構, 每次操作只會影響到容器中的一個或者兩個元素, 而且佇列影響的元素互不干擾. 如果要讓資料結構多樣化, 實作出支援併發的資料結構難度便會直線上升. 通常我們面臨的第一步就是選擇合適的底層資料結構.
以字典為例. C++ 標準樣板程式庫中就有兩種選擇 : 雜湊表 (參考《【資料結構】雜湊表》) 和紅黑樹 (參考《【資料結構】樹——紅-黑樹 (紅黑樹)》). 它們分別代表著無序和有序. 事實上, 陣列和連結串列也可以作為底層資料結構, 只是提供的效能不同而已. 要增加併發程度, 陣列顯然不合適, 即使陣列可以保證有序. 因為要在陣列中搜尋, 我們必須防止陣列被擴容這種情況發生, 此時就要求搜尋的時候陣列不能發生改變, 為此必須對整個陣列進行加鎖. 這就相當於第 5.1 節中堆疊的實作. 如果使用紅黑樹或者跳躍列表 (參考《【資料結構】跳躍列表》), 每次被搜尋到的結點是需要互斥鎖保護的. 如果使用雜湊表, 那麼至少每個桶需要互斥鎖保護. 常見的字典通常採用雜湊表來實作, 而且雜湊表每個桶都是一個前向連結串列.
在設計 C++ 容器時, 我們通常考慮為容器提供疊代器. 但是在併發下, 要想安全地透過疊代器存取或者修改元素並不容易. 在疊代器尋訪的過程中, 有一個最基本的假設 : 如果使用者沒有透過疊代器操作容器內的元素, 那麼容器內任何元素的狀態應該在尋訪期間保持不變; 如果使用者透過疊代器操作容器內元素, 使用者需要主動更新疊代器防止失效. 在併發下, 使用來自 C++ 標準樣板程式庫的疊代器幾乎是不可能的, 因為容器可能隨時被其它執行緒修改, 執行緒無法直接得知疊代器的有效性. 因此許多併發程式庫都採用了以下解決方案 :
- 提供成員函式
for_each
, 尋訪時借助類似於std::shared_mutex
至少禁止外界的寫入操作 (還是可以併發存取的). - 提供快照一致疊代器 : 在使用者請求疊代器後, 執少借助
std::shared_mutex
禁止外界寫入, 同時複製一份快照交給疊代器管理. 疊代器只會尋訪那一份快照, 而不會尋訪真正的容器. 快照一致疊代器的元素具有一定的滯後性, 尋訪結果不一定是最新的. - 弱一致疊代器 : 允許疊代器尋訪的過程中容器狀態發生變化, 並且保證疊代器的任何使用不會導致錯誤或者無限迴圈等, 尤其是未定行為, 但是不保證尋訪結果的準確性. 這種疊代器又有幾種實作方案 :
- 延遲操作 : 將已經接收到的外界操作先儲存下來, 先保證容器狀態不變, 等到外界疊代器全部銷毀或者累積到一定數量之後一次性更新容器;
- 利用標記 : 造成未定行為的核心原因一般都是存取了一個已經被回收的記憶體, 那麼可以先給元素帶上一個標記, 在尋訪的時候先檢查標記, 若元素已經被移除, 那麼跳過該元素;
- 為疊代器增加計數 : 若某個結點正在被疊代器所持有, 那麼如果有移除該元素的請求, 先滯後, 等到疊代器被被銷毀才開始回收該元素.
最後, 我們來討論較為複雜的搜尋樹. 像 AVL 樹 (參考《【資料結構】樹——AVL 樹》) 和紅黑樹都是需要透過旋轉操作來保持平衡的. 旋轉操作不是單個節點的操作, 通常是基於一個節點為中心, 其父子節點的共同操作. 在需要維護平衡性時, 旋轉通常是區域性而不是全域性的, 所以是否可能存在一種區域互斥鎖, 僅在旋轉的時候獨佔某幾個節點 :
- 路徑互斥鎖 : 按照尋訪的路徑, 遍尋訪邊加鎖, 這適用於可能出現修改的情形. 例如 AVL 樹可能旋轉至根節點, 紅黑樹可能只是改變顏色至根節點.
- 局域互斥鎖 : 僅鎖住要修改的部分, 例如以節點 A 為中心進行左旋轉, 那麼只鎖住節點 A 的父節點和左子節點.
正確地對所涉及節點進行加鎖可以避免這種情況 : 節點 A 的左子節點 B 正在被右旋轉, 當 B 的左子節點 C 稱為 A 的新左子節點時, B 可能還未成為 C 的右子節點, 以 B 為根節點的子樹可能在搜尋中被跳過.
若同時存在大量存取與寫入操作, 大量的互斥鎖可能會影響搜尋效能, 使得資料結構是存取友好而不是寫入友好的. 為了提升併發程度, 我們可以採取樂觀搜尋和驗證相結合的方式 : 為每一個節點設定一個版本號, 在尋訪時建立節點的影射表, 用於記錄搜尋到達某個節點時節點的版本號. 若成功找到了要搜尋的目標節點, 那麼只獨佔該節點 (這是為了防止版本號被其它執行緒改變), 並且驗證影射表中儲存的所走過所有節點的版本號是否發生變化. 若版本號發生變化, 那麼說明在尋訪的過程中已經有其它執行緒改變過樹的狀態了, 本次搜尋結果是不可信的, 是否需要重新搜尋由使用者決定. 如果沒有找到目標節點, 那麼只需要對比影射表中每個節點的版本號是否發生變化即可, 如果沒有, 便說明本次搜尋結果是可信的.
現在有一個新的問題 : 若尋訪到一個正在被旋轉的節點, 我們是可以直接退出尋訪的 (因為我們已經知道樹的狀態正在被發生改變, 像 AVL 樹中, 我們走過的路徑在接下來也可能會被改變), 但是如何獲知當前這個節點是否正在被旋轉? 我們可以利用版本號的奇偶性. 當一個節點需要被旋轉, 和本次旋轉關聯的節點 (包括父節點和其中一個子節點) 的版本號加一. 這個操作必定是在獨佔互斥鎖之後或者能夠保證是一個原子操作 (如果閣下沒學過 C++ 的無鎖程式設計, 那麼想像一下資料庫的交易), 我們不必擔心資料的競爭問題. 若在旋轉前的版本號為偶數, 那麼遇到偶數版本號的節點, 我們可以認為該該節點目前沒有進行旋轉操作. 當旋轉完成後, 我們再次對版本號加一, 此時版本號從奇數變成了偶數. 整棵樹的不同節點版本號可能不盡相同, 但是在平衡被打破之前, 所有節點的版本號奇偶性一定是一致的.
自創文章, 原著 : Jonny. 如若閣下需要轉發, 在已經授權的情況下請註明本文出處 :