java-recipes

ホーム マルチスレッド › Th-07

Th-07: Condition + Lock による待機・通知パターン

ReentrantLockCondition を使うと、synchronizedwait() / notify() より細かい待機・通知の制御ができます。 プロデューサーとコンシューマーでそれぞれ別の Condition を持ち、 「満杯なら待つ」「空なら待つ」を分離できます。

wait/notify との違い

synchronizedwait() / notifyAll() では、 「どの条件で待っているか」を区別できません。一方 Condition は複数作成できるため、 「キューが満杯のときだけプロデューサーを待たせる条件」と「キューが空のときだけコンシューマーを待たせる条件」を分けられます。

機能synchronized + wait/notifyLock + Condition
複数の待機条件❌(1つのオブジェクトに1セット)✅(lock.newCondition() で複数作成可)
タイムアウト付き待機△(wait(timeout) は使いにくい)✅(awaitNanos() / awaitUntil())
記述のシンプルさ◎(シンプル)△(unlock() を finally に書く必要あり)

プロデューサー・コンシューマーパターン

「生産者(プロデューサー)がデータを生産し、消費者(コンシューマー)が取り出す」という典型的な非同期処理パターンです。 キューが満杯のときはプロデューサーを待機させ、空のときはコンシューマーを待機させます。Condition を2つに分けることで、プロデューサーとコンシューマーを独立して起こすことができます。

ReentrantLock lock = new ReentrantLock();
Condition notFull  = lock.newCondition(); // プロデューサー用
Condition notEmpty = lock.newCondition(); // コンシューマー用

// プロデューサー: 満杯なら待機
lock.lock();
try {
    while (queue.size() == capacity) notFull.await();
    queue.addLast(item);
    notEmpty.signalAll(); // コンシューマーを起こす
} finally { lock.unlock(); }

// コンシューマー: 空なら待機
lock.lock();
try {
    while (queue.isEmpty()) notEmpty.await();
    T item = queue.removeFirst();
    notFull.signalAll(); // プロデューサーを起こす
} finally { lock.unlock(); }

サンプルコード

ConditionLockSample.java
import java.util.ArrayDeque;
import java.util.Deque;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class ConditionLockSample {

    // Condition を使ったプロデューサー・コンシューマー
    static class BoundedQueue<T> {
        private final Deque<T> queue = new ArrayDeque<T>();
        private final int capacity;
        private final ReentrantLock lock = new ReentrantLock();
        private final Condition notFull = lock.newCondition();  // キューが満杯でない条件
        private final Condition notEmpty = lock.newCondition(); // キューが空でない条件

        BoundedQueue(int capacity) {
            this.capacity = capacity;
        }

        // プロデューサー: キューに追加(満杯なら待機)
        void put(T item) throws InterruptedException {
            lock.lock();
            try {
                while (queue.size() == capacity) {
                    System.out.println("[Producer] キュー満杯 → 待機中...");
                    notFull.await(); // 満杯でなくなるまで待機
                }
                queue.addLast(item);
                System.out.println("[Producer] 追加: " + item
                    + " (サイズ: " + queue.size() + ")");
                notEmpty.signalAll(); // コンシューマーに通知
            } finally {
                lock.unlock();
            }
        }

        // コンシューマー: キューから取り出し(空なら待機)
        T take() throws InterruptedException {
            lock.lock();
            try {
                while (queue.isEmpty()) {
                    System.out.println("[Consumer] キュー空 → 待機中...");
                    notEmpty.await(); // 空でなくなるまで待機
                }
                T item = queue.removeFirst();
                System.out.println("[Consumer] 取得: " + item
                    + " (サイズ: " + queue.size() + ")");
                notFull.signalAll(); // プロデューサーに通知
                return item;
            } finally {
                lock.unlock();
            }
        }

        // タイムアウト付き取得(Java 8+)
        T poll(long timeoutMs) throws InterruptedException {
            lock.lock();
            try {
                long remaining = timeoutMs;
                while (queue.isEmpty()) {
                    if (remaining <= 0) {
                        System.out.println("[Consumer] タイムアウト");
                        return null;
                    }
                    remaining = notEmpty.awaitNanos(remaining * 1_000_000L) / 1_000_000L;
                }
                T item = queue.removeFirst();
                notFull.signalAll();
                return item;
            } finally {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        final BoundedQueue<Integer> queue = new BoundedQueue<Integer>(3);

        // プロデューサースレッド
        Thread producer = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 1; i <= 5; i++) {
                        queue.put(i);
                        Thread.sleep(100);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        // コンシューマースレッド
        Thread consumer = new Thread(new Runnable() {
            @Override
            public void run() {
                try {
                    for (int i = 0; i < 5; i++) {
                        queue.take();
                        Thread.sleep(200);
                    }
                } catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
        });

        producer.start();
        consumer.start();
        producer.join();
        consumer.join();
        System.out.println("完了");
    }
}

Condition は ReentrantLock の newCondition() で取得します。await() は現在のスレッドを待機させ、signalAll() は待機中のすべてのスレッドを起こします。while ループで待機することで、スプリアスウェイクアップ(spurious wakeup: 予期せぬ目覚め)に対処できます。

よくあるミス・注意点

⚠️ await() は if ではなく while ループの中で呼ぶ

スプリアスウェイクアップ(spurious wakeup)と呼ばれる現象により、 シグナルを受けていないのに await() が戻ることがあります。if (queue.isEmpty()) ではなく while (queue.isEmpty()) にしておくと、 起きた後も条件を再チェックするため安全です。

// ❌ if だとスプリアスウェイクアップに対応できない
if (queue.isEmpty()) notEmpty.await();

// ✅ while で条件を再チェック
while (queue.isEmpty()) notEmpty.await();

⚠️ Condition は必ず対応する Lock の中で await() / signalAll() を呼ぶ

Condition.await()signalAll() は、 その Condition を生成した Lock を保持している状態でのみ呼び出せます。 Lock の外で呼ぶと IllegalMonitorStateException が発生します。 必ず lock.lock()lock.unlock()(finally)で囲んでください。

テストする観点

  • ✅ プロデューサーが 5個追加し、コンシューマーが 5個取得した後にキューが空になること
  • ✅ キューが満杯(capacity=3)のときにプロデューサーが待機し、コンシューマーが取り出した後に再開すること
  • ✅ キューが空のときにコンシューマーが待機し、プロデューサーが追加した後に再開すること
  • ✅ タイムアウト付き poll() でキューが空のまま指定時間を過ぎると null が返ること(境界値: タイムアウト直前・直後)
  • ✅ Lock の外で Condition.await() を呼ぶと IllegalMonitorStateException が発生すること

GitHub でソースコードを見る →