C++11のthreadで遊んでみる その3 - condition_variable編


このエントリーをはてなブックマークに追加

この記事は続き記事です。目次→C++11のthreadで遊んでみる - minus9dの日記

    • -

今回はC++11のcondition_variableを使ってみる。condition_variableは日本語ではそのまま条件変数と訳される。条件変数は、あるスレッドを待ち状態にして、ある条件が揃ったときにそのスレッドへ通知を行うときに使われる。

書籍 C++ ポケットリファレンスサポートサイトに、分かりやすい例が載っている。以下は、この例に自分でコメントを足したもの。

// コードは以下のサイトから取得
// https://github.com/cpp-pocketref/sample-code/blob/master/008_thread/014_condition_variable.cpp

// Copyright (c) 2012-2013
// Akira Takahashi, Toshihiko Ando, Kohsuke Yuasa,
// Yusuke Ichinohe, Masaya Kusuda, wraith.
// Released under the CC0 1.0 Universal license.
#include <iostream>
#include <deque>
#include <thread>
#include <mutex>
#include <condition_variable>

using namespace std;

template <class T>
struct LockedQueue {
    // コンストラクタに、キューの大きさを指定
    explicit LockedQueue(int capacity)
        :  capacity(capacity)
        {}

    // キューにxを加える
    void enqueue(const T& x) {
        unique_lock<mutex> lock(m);
        // 外からnotify_all()またはnotify_one()によって起こされるまでブロックして待つ
        // ただし、起こされた時にキューが満杯だった場合は、またブロックして待つ
        c_enq.wait(lock, [this] { return data.size() != capacity; });
        data.push_back(x);
        // dequeueの準備ができたことを通知
        c_deq.notify_one();
    }
    
    // キューから要素を取り出す
    T dequeue() {
        unique_lock<mutex> lock(m);
        // 外からnotify_all()またはnotify_one()によって起こされるまでブロックして待つ
        // ただし、起こされた時にキューが空である場合は、またブロックして待つ
        c_deq.wait(lock, [this] { return !data.empty(); });
        T ret = data.front();
        data.pop_front();
        // enqueueの準備ができたことを通知
        c_enq.notify_one();
        return ret;
    }

private:
    mutex m;
    deque<T> data;
    size_t capacity;
    condition_variable c_enq;
    condition_variable c_deq;
};

// ワーカースレッド
// キューに0, 1, 2, 3, 4を順番に入れる
void worker(LockedQueue<int>& lq) {
    for (int i = 0; i < 5; ++i) {
        lq.enqueue(i);
        this_thread::sleep_for(chrono::milliseconds(100));
    }
}

int main() {
    // キューを作成 サイズは2
    LockedQueue<int> lq(2);

    // workerスレッドを起動
    thread th(worker, std::ref(lq));

    this_thread::sleep_for(chrono::milliseconds(1000));

    // キューに入れた要素を順番に取り出す
    for (int i = 0; i < 5; ++i) {
        int n = lq.dequeue();
        cout << "popped : " << n << endl;
    }

    // スレッドが終了するのを待つ
    th.join();

    return 0;
}

通知を受ける側は、wait()関数を呼び出して待機状態に入る。ただし、ここでspurious wakeupと呼ばれる現象のことを気にしなければならない。spurious wakeupとは、待機中のスレッドが、他のスレッドからwakeupの通知を送られたわけでもないのに、勝手に待機状態から抜けだして目覚めてしまう現象のことらしい。何でやねん、と思うが仕様だから仕方がない。spurious wakeupの存在意義については、例えばVolodya's blog: Spurious wakeupsなどにLinuxシステムコールの観点から説明されているが、正直自分には理解できていない。

spurious wakeupによる予期せぬスレッドの目覚めを防ぐために、プログラマはwait()関数の第二引数に、条件文を入れる。上のサンプルでは以下のような条件文が入れられている。

c_enq.wait(lock, [this] { return data.size() != capacity; });

通知を行う側は、notify_one()を呼び出して通知をする。