スレッドとタスク

https://chromium.googlesource.com/chromium/src/+/master/docs/threading_and_tasks.md

概観

Chromiumはマルチスレッドを多用する。それによりUIをスムーズにする。つまりIOや他の時間のかかる作業はスレッドで動いている。スレッド間の通信はメッセージを使う。ロックやスレッドセーフオブジェクトは推奨されない。オブジェクトは1つのスレッドのみに存在し、スレッド間でメッセージをやり取りし、情報のやり取りにはコールバックを使う。

コアコンセプト

  • タスク:処理されるべき仕事の単位。実際上は関数ポインタと関連データ。Chromeではbase::Bindによってつくられるbase::Callbackインスタンス。
  • タスクキュー:処理されるべきタスクのキュー
  • 物理スレッド:OSが提供するスレッド(POSIXならpthreadでWindowsならCreateThread()でつくられるスレッド)。これらはChromeのクロスプラットフォーム抽象であるbase::PlatformThreadで提供されるがこれを直接使うことはない。
  • base::Thread:専用のタスクキューのメッセージを処理する物理スレッドでQuit()が呼ばれるまで動き続ける。ほとんどの場合自分で作成することはない。
  • スレッドプール: タスクキューを共有する物理スレッドのプールでbase::ThreadPoolInstance。Chromeプロセスごとに一個のインスタンスをもつ。base/task/post_task.hによってポストされたタスクを処理する。base::ThreadPoolInstanceのAPIを直接コールすることはほとんどない(タスクのポストは後半で記述)。
  • シーケンスまたは仮想スレッド:Chromeによって管理されるスレッド。物理スレッドと同様に、ある時点で1つのタスクのみ実行されていて前のタスクの結果を見ることができる。タスクはシーケンシャルに実行されるが、スレッドはホップされることがある。
  • タスクランナー:これを通じてタスクをポストするためのインターフェース。Chromeでは@base::TaskRunner@。
  • シーケンスタスクランナー:ポストされた順番でシーケンス的にタスクを実行することを保証されたタスクランナー。前のタスクの結果を見ることができる。普通はシングルスレッドで実行される(仮想でも物理でも)。Chromeではbase::SequencedTaskRunnerbase::TaskRunnerを基底クラスにもつ。
  • シングルスレッドタスクランナー:シーケンス的なタスクランナーでタスクは同じスレッドで実行される。Chromeではbase::SingleThreadTaskRunnerでbase::SequencedTaskRunnerを基底クラスにもつ。我々は可能な限り物理スレッドよりもシーケンスを好んで用いる。

訳注: スレッドのホップとはおそらくスレッドプール内のスレッドで終わったスレッドが次のタスクに使われプール内のスレッドの順番ごとに実行されるということはないという意味だと思われる。

スレッド関連の語彙

ノート:以下の用語は我々がChromeで使っている用語を一般的な用語で理解するために書いたもの。初心者には難しいかもしれない。その場合には飛ばしても良い。

  • スレッドアンセーフ:Chromeでは多くの型はスレッドアンセーフ(デザイン)。このような型やメソッドにアクセスするには外部から同期する必要がある。通常このような型が多くのタスクからアクセスされる場合は同じbase::SequencedTaskRunnerにポストされる必要がある。デバッグビルドではこれがSEQUENCE_CHECKERによってチェックされる。ロックを使うこともできるがChromeではシーケンスが好まれる。
  • スレッドアファイン:このような型やメソッドは常に同じ物理スレッド(例えば同じbase::SingleThreadTaskRunner)からアクセスされる必要がある。通常THREAD_CHECKERをもちチェックされる。Short of using a third-party API or having a leaf dependency which is thread-affine: there's pretty much no reason for a type to be thread-affine in Chrome. base::SingleThreadTaskRunnerbase::SequencedTaskRunnerであるので(is-a)スレッドアファインはスレッドアンセーフのサブセットである。スレッドアファインはスレッド敵対とも呼ばれる。
  • スレッドセーフ:これらの型やメソッドは同時にアクセスできる。
  • スレッド互換:これらの型はコンストメソッドには同時にアクセスできるが、非コンストには同期が必要である。Chromeはリーダーライターロックを提供していないので、使うときには初期化時に(通常はグローバルでのそれ)スレッドセーフ的に初期化して(スタートアップ時のシングルスレッド状態の時か、base::NoDestructorのような実行時のスタティックローカル初期化)それ以降は変更しないでつかう。
  • Immutable:スレッド互換のサブセットの型で初期化後は変更できない。
  • シーケンスフレンドリー:これらの型やメソッドはスレッドアンセーフでbase::SequencedTaskRunnerから呼ばれることをサポートする。理想的にはすべてのスレッドアンセーフはシーケンスフレンドリーなのが望ましいが、legacy code sometimes has overzealous checks that enforce thread-affinity in mere thread-unsafe scenarios. 以下のスレッドよりシーケンスが好ましいを参照。

訳注: アファインはaffineでCPU-affinityからきていると思われる。上記の内容から制限が厳しい順にスレッドをならべると、スレッドアファイン<スレッドアンセーフ<スレッドセーフになる。スレッドアファインはTLSなどをもつスレッドなので同じスレッドでなければならない。スレッドアンセーフはある時点で1つしか動いていないならどのスレッドからアクセスされても良いスレッド(=シーケンスならOK)。ただし上記の記述には意味不明の点もある。

スレッド

すべてのChromeプロセスは以下のスレッドを持つ。

  • メインスレッド
    • ブラウザプロセス:UIの更新
    • レンダープロセス:Blinkの大体の仕事
  • IOスレッド
    • ブラウザプロセス:IPCのハンドルやネットワークリクエスト
    • レンダープロセス:IPCのハンドル
  • いくつかの特別なスレッド
  • 一般用のプール

ほとんどのスレッドはタスクを取得するためのループとキューを持っている。(キューは共有されていることもある)

タスク

タスクはbase::OnceClosureクラスのインスタンスで、非同期の実行のためキューに追加される。 base::OnceClosureは関数ポインタと引数を保持しており、Run()メソッドが関数ポインタを実行する。これはbase::BindOnceで作られる(参照:コールバックとBind()

void TaskA() {}
void TaskB(int v) {}

auto task_a = base::BindOnce(&TaskA);
auto task_b = base::BindOnce(&TaskB, 42);

タスクのグループは以下の方法で実行される。

  • パラレル:タスクの順番を考慮しない。別々のスレッドで一気に実行される場合もある。
  • シーケンス:ポストされた順番通りに実行される。一度に一つずつ実行される。どのスレッドでも実行されえる。
  • シングルスレッド:ポストされた通りの順番で1つのスレッドで実行される。
*COMシングルスレッド:シングルスレッドのCOM版、COMの初期化を伴う。

シーケンスのほうがシングルスレッドより好ましい

スレッドセーフだけが要求される場合、シーケンス実行モードはシングルスレッドよりも遥かに好ましい。(シングルスレッドの場合、スレッドの最後に仕事を追加するがシーケンスの場合スレッドをHOPできる。)スレッドがホップできるとスレッドカウントが動的にマシンのリソース利用状況に適用される(大きいマシンでは早くなり、遅いマシンではゴミを減らす)。

多くのコアAPIは最近シーケンスフレンドリになった。(ほとんどのクラスはthread-affine(スレッドローカルストレッジを使ったりしている)ではないはず)。しかしコードベースはシングルスレッドコンテキストを前提として進化してきた。もしクラスがシーケンスで実行可能で、リーフ依存のなかでThreadChecker/ThreadTaskRunnerHandle/SingleThreadTaskrunnerを多用しブロックされているなら、他の人のためにコードを見直すべき(最低でもブロッキングバグをhttps://crbug.com/675631に提出すべきで、base::CreateSingleThreadTaskRunnerWithTraits()にはTODOのフラグを付けるべき)。

シングルスレッドからシーケンスへの詳細な文書はここにある。

以下の議論はタスクを実行するこれらの詳細を述べる。

パラレスタスクをポスト

タスクスケジューラへ直接ポスト

どのスレッドでも実行できて、排他制御を持たないタスクはbase::PostTask*()を使うべき。base/task_scheduler/post_task.hで定義される。

base::PostTask(FROM_HERE, base::BindOnce(&Task));

これはデフォルトのトレイトでタスクをポストする。

base::PostTask*WithTraits()は、TaskTraitsを使って追加情報を提供できる。(下記参照)

base::PostTaskWithTraits(
FROM_HERE, {base::TaskPriority::BACKGROUND, MayBlock()},
base::BindOnce(&Task));

TaskRunner経由でポストする

パラレルのTaskRunnerは直接base::PostTask*()コールの代替品である。前もってタスクがパラレルなのかシーケンスなのかシングルスレッドなのかわからないときに有用である。(下記参照)。TaskRunnderSequencedTaskRunnerSingleThreadTaskRunnerのベースクラスなので、scoped_refptr<TaskRunnder>メンバはTaskRunnderSequencedTaskRunner、またはSingleThreadTaskRunnerを保持できる。

class A {
        public:
        A() = default;

        void set_task_runner_for_testing(
        scoped_refptr<base::TaskRunner> task_runner) {
                task_runner_ = std::move(task_runner);
        }

        void DoSomething() {
                // プロダクションコードでは, Aは常にパラレスでポストされる。テストでは
                // set_task_runner_for_testing()で提供されるTaskRunnerにポストされる。
                task_runner_->PostTask(FROM_HERE, base::BindOnce(&A));
        }

        private:
        scoped_refptr<base::TaskRunner> task_runner_ =
        base::CreateTaskRunnerWithTraits({base::TaskPriority::USER_VISIBLE});
};

テストがタスク実行の正確なコントロールを必要としないときは、base::PostTask*()を直接コールするのが好ましい。

シーケンスタスクのポスト

シーケンスタスクはポストされた順番に1つづつ処理されるタスク(同じスレッドで動作するとは限らない)。シーケンスタスクをポストするにはSequencedTaskRunnerを使う。

新規シーケンスをポスト

SequncedTaskRunnerbase::CreateSequencedTaskRunnerWithTraits()によって作成できる。

scoped_refptr<SequencedTaskRunner> sequenced_task_runner =
base::CreateSequencedTaskRunnerWithTraits(...);

// TaskBはTaskAが終わってから実行される。
sequenced_task_runner->PostTask(FROM_HERE, base::BindOnce(&TaskA));
sequenced_task_runner->PostTask(FROM_HERE, base::BindOnce(&TaskB));

カレントシーケンスにポスト

現在のタスクがポストされたSequencedTaskRunnerを得るにはSquencedTaskRunnerHandle::Get()を使う。

注意: パラレルタスクからSequencedTaskRunnerHandle::Get()を呼ぶことは出来ない。シングルスレッドのタスク(SingleThreadTaskRunnerSequencedTaskRunner)からなら呼ぶことができる。

// タスクはすでにポストされたタスクの後に実行される。
// (つまりこのタスクが終わってから実行される)。
// 他のタスクと同時に実行されることはない。
base::SequencedTaskRunnerHandle::Get()->
PostTask(FROM_HERE, base::BindOnce(&Task));

ロックの代わりにシーケンスを使う

Chromeではロックは推奨されない。シーケンスは本質的にスレッドセーフを提供する。ロックでスレッドセーフを確保するよりも、同じシーケンスからのみアクセスされるクラスの方が好ましい。

class A {
        public:
        A() {
                // アクセスは作成シーケンス上にあることを要求しない。
                DETACH_FROM_SEQUENCE(sequence_checker_);
        }

        void AddValue(int v) {
                // すべてのアクセスが同じシーケンスかをチェック。
                DCHECK_CALLED_ON_VALID_SEQUENCE(sequence_checker_);
                values_.push_back(v);
        }

        private:
        SEQUENCE_CHECKER(sequence_checker_);

        // すべてのアクセスは同じシーケンス上にあるので
        // ロックは必要ない。
        std::vector<int> values_;
};

A a;
scoped_refptr<SequencedTaskRunner> task_runner_for_a = ...;
task_runner_for_a->PostTask(FROM_HERE,
base::BindOnce(&A::AddValue, base::Unretained(&a), 42));
task_runner_for_a->PostTask(FROM_HERE,
base::BindOnce(&A::AddValue, base::Unretained(&a), 27));

// 違うシーケンスからのアクセスはDCHECKで失敗する。
scoped_refptr<SequencedTaskRunner> other_task_runner = ...;
other_task_runner->PostTask(FROM_HERE,
base::BindOnce(&A::AddValue, base::Unretained(&a), 1));

ロックはマルチスレッドでの共有データ構造のスワップでのみ使われるべきである。ロック中に重たい処理をすべきではない。データの用意が完了してからロックを取得しスワップすべきである。1つの例はPluginList::LoadPlugins(content/common/plugin_list.cc)である。どうしてもロックが必要なら、ロックと条件変数を参照してベストプラクティスを見て落とし穴にはまらないように。

ブロッキングなしのコードが書けるように、Chromiumの多くのAPIは非同期である。このことが意味することは処理は特定のスレッド/シーケンスで実行され、結果はデリゲートで受け取る、つまりbase::Callback<>オブジェクトが完了後に呼び出されることで結果を受け取るということである。

同じスレッドに多くのタスクをポスト

もし多くのタスクが同じスレッドで実行される必要がある場合は、同じSingleThreadTaskRunnerにポストする。ポストされた順番通りに実行される。

ブラウザプロセスのメインスレッドまたはIOスレッドへポスト

適切なSingleThreadTaskRunnerを取得してポストする。取得にはcontent::BrowserThread::GetTaskRunnerForThreadを使う。

content::BrowserThread::GetTaskRunnerForThread(content::BrowserThread::UI)
->PostTask(FROM_HERE, ...);

content::BrowserThread::GetTaskRunnerForThread(content::BrowserThread::IO)
->PostTask(FROM_HERE, ...);

メインスレッドやIOスレッドはいつもとても忙しいので可能であれば一般的なスレッド(パラレルタスクやシーケンスタスク)へポストすること。UIの更新やブラウザスレッドが持っているオブジェクト(プロファイルなど)へのアクセスのためにはブラウザスレッドを使う。IPCやネットワークにアクセスするにはIOスレッドを使う。注意:IPCやネットワークにアクセスするためにIOスレッドが絶対に必要なわけではない。

レンダプロセスのメインスレッドへポスト

TODO

カスタムSingleThreadTaskRunnerへポスト

多数のタスクが同じスレッドで実行される必要があり、それらはメインスレッドもIOスレッドも必要としないときはbase::CreateSingleThreadTaskRunnerWithTraitsで作成されたSingleThreadTaskRunnerへポストする。

scoped_refptr<SequencedTaskRunner> single_thread_task_runner =
base::CreateSingleThreadTaskRunnerWithTraits(...);

// TaskBはTaskAの完了後に実行される。どちらのタスクも同じスレッドで実行される。
single_thread_task_runner->PostTask(FROM_HERE, base::BindOnce(&TaskA));
single_thread_task_runner->PostTask(FROM_HERE, base::BindOnce(&TaskB));

重要:ほとんどの場合これは必要ない。Chromiumでのほとんどのクラスはスレッドセーフティを要求している(スレッドアフィニティではなく)。もし使うAPIが不正確にスレッドアフィニティなら(単にスレッドアンセーフのときにbase::ThreadCheckerを使っていたり(base::SequenceCheckerを使うべき))、自分のAPIもスレッドアフィニティにして自体を悪化させるより、それを修正することを考えてください。

カレントスレッドへポストするならThreadTaskRunnerHandleを使う。

// タスクはこの後にカレントスレッドで実行される。
base::ThreadTaskRunnerHandle::Get()->PostTask(
FROM_HERE, base::BindOnce(&Task));

注意:パラレルまたはシーケンスからThreadTaskRunnerHandle::Get()を呼ぶことは出来ません。

COMのSingle-Thread Apartmentスレッド(STD)にポストする(Windows)

COMのSTAで動くタスクをポストするには、CreateCOMSTATaskRunnerWithTraits()で得られるSingleThreadTaskRunnerを使う。

// Task(A|B|C)UsingCOMSTAは同じCOM STAスレッドで実行される。
void TaskAUsingCOMSTA() {
        // [ これはCOM STAスレッドで実行される ]
        // COM STA呼び出しを実行。
        // ...

        // 他のタスクを現在のCOM STAスレッドで実行。
        base::ThreadTaskRunnerHandle::Get()->PostTask(
        FROM_HERE, base::BindOnce(&TaskCUsingCOMSTA));
}
void TaskBUsingCOMSTA() { }
void TaskCUsingCOMSTA() { }

auto com_sta_task_runner = base::CreateCOMSTATaskRunnerWithTraits(...);
com_sta_task_runner->PostTask(FROM_HERE, base::BindOnce(&TaskAUsingCOMSTA));
com_sta_task_runner->PostTask(FROM_HERE, base::BindOnce(&TaskBUsingCOMSTA));

タスクトレイトでタスクを修飾

TaskTraitsはタスクについての情報をもち、タスクスケジューラがより良い決定をできるようにする。

base/task_scheduler/post_task.h内のすべてのPostTask*()関数はTaskTraitsを引数に取る関数をオーバーロードしている。TaskTraitsをとらない関数のオーバーロードに適しているのは以下の場合:

  • ブロックしない(参照:MayBlockWithBaseSyncPrimitives)。
  • 現在の優先度を継承したい場合。
  • シャットダウンをブロックするか、シャットダウン時にスキップされる場合(タスクスケジューラが適切な選択を自由にする)。これに当てはまらない場合はTaskTraitsを使わなくてはならない。

base/task_scheduler/post_task.hには利用できるトレイトについて大量のドキュメントがある。以下はTaskTraitsを指定する実例。

// このタスクはTaskTraitsを指定されていない。ブロックできない。優先度は
// 呼び出し元のものを継承する。(バックグラウンドタスクからポストされれば
// バックグラウンドの優先度をもつ)
// シャットダウンをブロックするかもしれないし、シャットダウン中にスキップされるかもしれない
base::PostTask(FROM_HERE, base::BindOnce(...));

// このタスクは最上位の優先度をもつ。スケジューラはUSER_VISIBLEやBACKGROUNDより
// 前にこのタスクを実行しようとする。
base::PostTaskWithTraits(
FROM_HERE, {base::TaskPriority::USER_BLOCKING},
base::BindOnce(...));

// このタスクは最低位の優先度をもつ。ブロックされる
// (つまりディスクからファイルを読める)
base::PostTaskWithTraits(
FROM_HERE, {base::TaskPriority::BACKGROUND, base::MayBlock()},
base::BindOnce(...));

// このタスクはシャットダウンをブロックする。タスクが完了するまで
// プロセスは終了しない。
base::PostTaskWithTraits(
FROM_HERE, {base::TaskShutdownBehavior::BLOCK_SHUTDOWN},
base::BindOnce(...));

ブラウザの反応をよくする

メインスレッドやIOスレッドや低遅延が期待されるシーケンスで重たい処理をしないこと。それらはbase::PostTaskAndReply*()SequenceTaskRunner::PostTaskAndReplyで非同期で実行する。注意:IOスレッドの非同期/オーバーラップIOは問題ない。

実例:以下のコードをメインスレッドで実行すると、ブラウザのレスポンスが悪くなりユーザは待たされる。

// GetHistoryItemsFromDisk()は長時間ブロックするかもしれない。
// AddHistoryItemsToOmniboxDropDown()はUIをアップデートするので
// メインスレッドから呼ばなくてはならない。
AddHistoryItemsToOmniboxDropdown(GetHistoryItemsFromDisk("keyword"));

以下のコードは修正版。GetHistoryItemsFromDisk()をスレッドプールにスケジュールし、その後AddHistoryItemsToOmniboxDropdown()を呼ぶ(メインスレッドで)。最初の呼び出しの戻り値は自動的に次の呼び出しの引数になる。

base::PostTaskWithTraitsAndReplyWithResult(
FROM_HERE, {base::MayBlock()},
base::BindOnce(&GetHistoryItemsFromDisk, "keyword"),
base::BindOnce(&AddHistoryItemsToOmniboxDropdown));

遅延をもってタスクをポスト

遅延を持って一回限りのタスクをポスト

一定時間後に一回限り実行されるタスクをポストするには、PostDelayedTask*()またはTaskrunner::PostDelayedTask()を使う。

base::PostDelayedTaskWithTraits(
FROM_HERE, {base::TaskPriority::BACKGROUND}, base::BindOnce(&Task),
base::TimeDelta::FromHours(1));

scoped_refptr<base::SequencedTaskRunner> task_runner =
base::CreateSequencedTaskRunnerWithTraits({base::TaskPriority::BACKGROUND});
task_runner->PostDelayedTask(
FROM_HERE, base::BindOnce(&Task), base::TimeDelta::FromHours(1));

注意:1時間後に実行されるタスクは、おそらく1時間たった後すぐに実行される必要はないだろう。base::TaskPriority::BACKGROUNDを指定してブラウザにインパクトを与えないようにしている。

遅延を持って繰り返しタスクをポスト

一定間隔で繰り返し実行されるタスクをポストするには、base::RepeatingTimerを使う。

class A {
        public:
        ~A() {
                // The timer is stopped automatically when it is deleted.
        }
        void StartDoingStuff() {
                timer_.Start(FROM_HERE, TimeDelta::FromSeconds(1),
                this, &MyClass::DoStuff);
        }
        void StopDoingStuff() {
                timer_.Stop();
        }
        private:
        void DoStuff() {
                // This method is called every second on the sequence that invoked
                // StartDoingStuff().
        }
        base::RepeatingTimer timer_;
};

タスクをキャンセル

base::WeakPtrを使う

base::WeakPtrを使うとオブジェクトが破棄された後はタスクがキャンセルされることを保証する。

int Compute() {  }

class A {
        public:
        A() : weak_ptr_factory_(this) {}

        void ComputeAndStore() {
                // スレッドプールでCompute()呼び出しをスケジュールする。現在のシーケンスで
                // A::Store()が後に続いて実行される。A::Store()呼び出しは|weak_ptr_factory_|が破棄
                // されればキャンセルされる。
                // (|this|が開放された後使われないことを保証する)
                base::PostTaskAndReplyWithResult(
                FROM_HERE, base::BindOnce(&Compute),
                base::BindOnce(&A::Store, weak_ptr_factory_.GetWeakPtr()));
        }

        private:
        void Store(int value) { value_ = value; }

        int value_;
        base::WeakPtrFactory<A> weak_ptr_factory_;
};

注意:WeakPtrはスレッドセーフでない:GetWeakPtr(), ~WeakPtrFactory(), Compute() (WeakPtrに拘束されている)は同じシーケンスで呼ばなければならない。

Using base::CancelableTaskTracker

Page last modified on August 31, 2019, at 03:18 AM
Powered by PmWiki