Apache > Hadoop > ZooKeeper
 

ZooKeeper のレシピとソリューション

ZooKeeper で高度な機能を構築するためのガイド

このドキュメントでは、ZooKeeper を使って高度な機能を実装する方法を示します。これらの方法はいずれもクライアント側で実装するものなので、ZooKeeper 側では何ら特別なサポートは必要ありません。ZooKeeper コミュニティがこれらの方法をクライアントサイドライブラリに取り込んでくれたら、その利用も簡単になって、標準的な手法として使われる機会も増えるでしょう。

ZooKeeper で最も興味深いことの一つは、ZooKeeper が非同期の通知を使っているにもかかわらず、ユーザー側では ZooKeeper を使って、キューやロックといった同期一貫性プリミティブを構築できることです。このドキュメントを読むとわかるように、それが可能なのは、ZooKeeper が更新に対して全体的な順序を保証しており、この順序を公開するしくみを用意しているからです。

以下で示すさまざまなレシピでは、ベストプラクティスに従うようにしています。具体的には、トラフィックを爆発させてスケーラビリティを損なう、いわゆる「殺到現象 (herd effect)」につながるようなポーリングやタイマ、その他の使用を避けています。

ここでは取り上げていない数多くの有益な機能も実装できます。一つだけ例を挙げれば、取消可能な読み書き優先ロックもそうです。以下で示すサンプルは、そうした新機能の実装を考える際のヒントとなることを目的にしています。

はじめから用意されているアプリケーション: 名前サービス、設定、グループメンバーシップ

ZooKeeper の主な用途として、名前サービスと設定があります。これら 2 つの機能は、ZooKeeper API によって直接提供されています。

ZooKeeper によって直接提供されるもう一つの機能に、グループメンバーシップがあります。グループは、1 つのノードによって表されます。グループのメンバーは、このグループノードの下にエフェメラルノードを作成します。異常終了したメンバーのノードは、ZooKeeper が異常終了を検出したときに、自動的に削除されます。

バリア

多くの分散システムでは、ある条件が満たされるまで一連のノードの処理をブロックし、条件が満たされたらすべてのノードが処理を続行できるようにするために、バリアというしくみを使っています。ZooKeeper では、バリアノードを指定することでバリアを実装します。バリアノードが存在する場合には、バリアが有効になっています。疑似コードで記述すると、次のようになります。

  1. クライアントはバリアノードに対して、ZooKeeper API の exists( ) 関数を呼び出します。このとき、watch には true を設定します。

  2. exists( ) が false を返した場合、バリアは存在しないので、クライアントは処理を続行します。

  3. 一方、exists( ) が true を返した場合、クライアントは、バリアノードを対象とするウォッチイベントが ZooKeeper から送られてくるのを待ちます。

  4. ウォッチイベントがトリガされたら、クライアントは exists( ) を呼び出し、バリアノードが削除されるまで再度待機します。

二重のバリア

二重のバリアを使うと、計算の開始と終了を複数のクライアント間で同期させることができます。具体的には、十分な数のプロセスがバリアに入ったら、プロセスはそれぞれ計算を開始し、計算を終了したプロセスはバリアを出ます。このレシピでは、ZooKeeper のノードをバリアとして使う方法を示します。

疑似コードでは、バリアノードを b で表します。各クライアントプロセス p は、バリアに入ったときにバリアノードに登録し、バリアを出る準備ができたら登録を解除します。各プロセスは、下の入るときの手順に従って、バリアノードに対する登録を行い、x 個のクライアントプロセスが登録されるのを待って、計算処理を開始します (x については、プログラマがそれぞれのシステムに適した値を設定します)。

入るとき 出るとき
  1. 名前を作成します: n = b+“/”+p

  2. ウォッチを設定します: exists(b + ‘‘/ready’’, true)

  3. 子を作成します: create(n, EPHEMERAL)

  4. L = getChildren(b, false)

  5. もし L 内の子の数が x より少なければ、ウォッチイベントを待ちます。

  6. それ以外の場合: create(b + ‘‘/ready’’, REGULAR)

  1. L = getChildren(b, false)

  2. もし子が存在しなければ終了します。

  3. もし p が L 内の唯一のプロセスノードならば、delete(n) を実行して終了します。

  4. もし p が L 内で最も番号の小さなプロセスノードならば、L 内で番号の一番大きなプロセスノードに注目して待機します。

  5. それ以外の場合、もし n がまだ存在すれば delete(n) を実行し、L 内で最も番号の小さなプロセスノード に注目して待機します。

  6. 手順 1. に戻ります。

バリアに入るとき、すべてのプロセスは ready ノードをウォッチし、バリアノードの子としてエフェメラルノードを作成します。最後のプロセスを除く各プロセスは、バリアに入ると ready ノードが出現するのを待ちます。これが手順 5. です。x 番目のノードを作成するプロセス、すなわち最後のプロセスは、子のリストに x 個のノードがあるので ready ノードを作成し、ほかのすべてのプロセスを起動します。待機中のプロセスは、処理を開始するべきときになってはじめて起動するので、待機が有効であることに注意してください。

バリアを出るときは、プロセスノードが消えるのをウォッチすることになるので、ready のようなフラグを使うことはできません。エフェメラルノードを使うことで、バリアに入ってから異常終了したプロセスも、ほかのプロセスの処理を妨げることはありません。バリアを出る準備が整ったら、プロセスはそれぞれに対応するプロセスノードを削除し、ほかのすべてのプロセスが同様の操作を行うまで待つ必要があります。

プロセスがバリアを出るのは、b の子であるプロセスノードが一つも存在しなくなったときです。ただし、すべてのプロセスノードに注目するのではなく、最も番号の小さなプロセスノードだけを ready フラグとして使用すれば、処理を効率的に行うことができます。バリアを出る準備ができたほかのすべてのプロセスは、存在する中で最も番号の小さなプロセスノードの消滅をウォッチし、最も番号の小さなプロセスノードのオーナーは、ほかのプロセスノードの消滅を (処理を単純にするために最も番号の大きなノードだけに注目して) ウォッチします。これで、最後のノードを除く各ノードの削除時には、起動するのは 1 つのプロセスだけで、最後のノードが削除されたときにはじめて、すべてのプロセスが起動することになります。

キュー

分散キューは、よく使われるデータ構造です。ZooKeeper で分散キューを実装するには、キューを保持する znode をキューノードとして用意してやります。分散クライアントがキューに何かを入れたいときは、"queue-" で終わるパス名を指定して create( ) を呼び出します。このとき、シーケンスフラグとエフェメラルフラグを true に設定します。シーケンスフラグがセットされているので、新しいパス名は _path-to-queue-node_/queue-X という形式になり、X には単調増加する番号が入ります。クライアントがキューから出たい場合、クライアントは、watch に true を設定して ZooKeeper の getChildren( ) 関数を呼び出し、最も小さい番号を持つノードの処理を開始します。クライアントは、getChildren( ) の最初の呼び出しで取得したリストの処理を終えるまでは、新たに getChildren( ) を呼び出す必要はありません。キューノードに子が存在しない場合、読み取る側のプロセスはウォッチの通知を待って、再度キューをチェックします。

現在、ZooKeeper の recipes ディレクトリには、キューの実装が取り込まれています。この実装はリリース版とともに配布されており、パッケージの src/recipes/queue ディレクトリにあります。

優先キュー

優先キューは、汎用のキューのレシピに 2 つ簡単な変更を加えるだけで実装できます。まず、キューを追加するときは、"queue-YY" で終わるパス名を使います。ここで、YY は追加する要素の優先度で、(UNIX の場合と同様) 値が低いほど優先度は高くなります。次に、キューから出たいクライアントは、最新の子のリストを使用します。すなわち、キューノードに対するウォッチの通知がトリガされた場合、クライアントは以前に取得した子のリストを無効にします。

ロック

グローバルに同期した完全分散ロックでは、時間軸上のあらゆる点において、2 つのクライアントが同じロックを保持することはありません。ZooKeeeper を使うと、このようなロックを実装できます。優先キューのときと同様、まずロックノードを定義します。

現在、ZooKeeper の recipes ディレクトリには、ロックの実装が取り込まれています。この実装はリリース版とともに配布されており、パッケージの src/recipes/lock ディレクトリにあります。

ロックを取得するクライアントは、次の操作を行います。

  1. "_locknode_/lock-" をパス名に指定し、シーケンスフラグとエフェメラルフラグをセットして create( ) を呼び出します。

  2. ロックノードに対して getChildren( ) を呼び出します。ただし、watch フラグはセットしません (殺到現象を避けるには、こうすることが重要です)。

  3. 手順 1. で作成されたパス名のサフィックスに相当する番号が一番小さい場合、クライアントはロックを取得し、ロックプロトコルを出ます。

  4. クライアントは、ロックディレクトリ内で次に小さい番号を持つパスを対象に、ウォッチフラグをセットして exists( ) を呼び出します。

  5. もし exists( ) が false を返した場合には、手順 2. に戻ります。それ以外の場合には、直前の手順で指定したパス名に対する通知が届くのを待って、手順 2. に戻ります。

ロックを解除するときのプロトコルは非常にシンプルです。ロックを解除したいクライアントが、上の手順 1. で作成したノードを削除するだけです。

以下のことに注目してください。

  • 各ノードをウォッチしているクライアントは 1 つだけなので、ノードを削除したときに起動するのは 1 つのクライアントだけです。このため、「殺到現象」を避けることができます。

  • ポーリングは行っておらず、タイムアウトもありません。

  • このようなロックの実装方法によって、ロックの競合の程度がどれくらいかの調査、ロックの解除、ロックにまつわる問題のデバッグなどが簡単に行えます。

共有ロック

上のロックプロトコルに少し変更を加えるだけで、共有ロックを実装できます。

読み取りロックの取得: 書き込みロックの取得:
  1. create( ) を呼び出し、"_locknode_/read-" をパス名に持つノードを作成します。このノードは、ロックプロトコルの以降の手順で使われるロックノードになります。なお、必ずシーケンスフラグとエフェメラルフラグの両方をセットしておきます。

  2. ロックノードに対して getChildren( ) を呼び出します。ただし、watch フラグはセットしません (殺到現象を避けるには、こうすることが重要です)。

  3. "write-" で始まるパス名を持ち、かつ、手順 1. で作成したノードよりも番号の小さい子が存在しない場合、クライアントはロックを取得し、ロックプロトコルを出ます。

  4. それ以外の場合には、ロックディレクトリ内で次に小さい番号の "write-" で始まるパス名を対象に、watch フラグをセットして exists( ) を呼び出します。

  5. もし exists( )false を返した場合には、手順 2. に戻ります。

  6. それ以外の場合には、直前の手順で指定したパス名に対する通知が届くのを待って、手順 2. に戻ります。

  1. create( ) を呼び出し、"_locknode_/write-" をパス名に持つノードを作成します。このノードは、ロックプロトコルの以降の手順で使われるロックノードになります。なお、必ずシーケンスフラグとエフェメラルフラグの両方をセットしておきます。

  2. ロックノードに対して getChildren( ) を呼び出します。ただし、watch フラグはセットしません (殺到現象を避けるには、こうすることが重要です)。

  3. 手順 1. で作成したノードよりも番号の小さい子が存在しない場合、クライアントはロックを取得し、ロックプロトコルを出ます。

  4. 次に小さい番号のパス名を対象に、watch フラグをセットして exists( ) を呼び出します。

  5. もし exists( )false を返した場合には、手順 2. に戻ります。それ以外の場合には、直前の手順で指定したパス名に対する通知が届くのを待って、手順 2. に戻ります。

このレシピに従うと、「殺到現象」が起こるように思えるかもしれません。読み取りロックを取得するために待機するクライアントが多数にのぼる場合、これらのクライアントは、最も番号の小さい "write-" ノードが削除されたときに、ほぼ同時に通知を受け取ることになるからです。しかしこれは実際には有効な動作です。待機しているすべての読み取り側クライアントを、実際に「解き放つ」必要があります。なぜなら、これらのクライアントはロックを保持しているからです。そもそも「殺到現象」とは、単一のマシンまたはごく少数のマシンだけしか処理を行うことができない状況下で、「群れ (herd)」を解き放つ (大量のプロセスが起動する) 結果生じる現象をいいます。

回復可能な共有ロック

共有ロックプロトコルに少し手を加えると、取り消し可能な共有ロックを作成できます。

読み取りロックプロトコルと書き込みロックプロトコルの両方の手順 1. において、create( ) の呼び出しの直後に、watch フラグをセットして getData( ) を呼び出します。その後、手順 1. で作成したノードを対象とする通知を受け取ったら、クライアントはそのノードに対して、watch フラグをセットして再度 getData( ) を呼び出し、文字列 "unlock" を探します。文字列 "unlock" がある場合、クライアントはロックを解放しなければなりません。この共有ロックプロトコルでは、ロックノードに対して setData( ) を呼び出して "unlock" をノードに書き込むことで、ロックを持つクライアントにロックを手放すよう要求できます。

この共有ロックプロトコルでは、ロックの保持者に、ロック解放への同意を求める必要があります。こうした同意は、ロック保持者がロックを解放する前に何らかの処理を行う必要がある場合には特に重要です。もちろん、強力レーザー光線付き取り消し可能共有ロックを実装すること、すなわち、一定時間経過後もロックの保持者によってロックが削除されない場合は、ロックの取り消しを要求する側がロックを削除できるようにロックプロトコルを定めることも可能です。

2 相コミット

2 相コミットプロトコルは、分散システムのすべてのクライアントが、トランザクションをコミットするか中止するかについて合意するためのアルゴリズムです。

ZooKeeper で 2 相コミットを実装するには、トランザクションノード (たとえば "/app/Tx") と、参加サイトごとの子ノード (たとえば "/app/Tx/s_i") を、コーディネータに作成させます。コーディネータは、子ノードを作成するときに、内容を未定義にしておきます。トランザクションに参加する各サイトが、コーディネータからトランザクションを受け取ったら、各サイトはそれぞれの子ノードを読み取って、ウォッチを設定します。ついで各サイトはクエリを処理し、自分の子ノードへの書き込みを行うことによって、「コミット (commit)」か「中止 (abort)」のどちらかに投票します。書き込みが完了したら、ほかのサイトは通知を受け取ります。すべてのサイトの投票が済んだ時点で、これらのサイトは「コミット」するか「中止」するかを決定できます。各ノードでは、ほかのいくつかのノードがすでに「中止」に投票している場合には、早期に「中止」を決定できます。

この実装で興味深いのは、コーディネータの役割が、参加サイトの範囲を定めること、ZooKeeper ノードを作成すること、および、該当サイトにトランザクションを伝達することだけである点です。実際には、トランザクションをトランザクションノードに書き込めば、トランザクションの伝達も ZooKeeper を通じて行うことができます。

上に示したアプローチには、2 つの大きな欠点があります。ひとつは、メッセージが複雑になる (O(n²)) ことです。もうひとつは、エフェメラルノードを使ってサイトの障害を検出することができないことです。エフェメラルノードを使ってサイトの障害を検出するには、サイト側でエフェメラルノードを作成する必要があります。

最初の問題は、コーディネータだけがトランザクションノードに変更を通知するようにし、ついで、コーディネータが決定を下したら、その決定をトランザクションサイトに通知するようにすれば解決します。このアプローチはスケーラブルですが、すべての通信がコーディネータを経由するので、処理もそれだけ遅くなる点に注意してください。

2 番目の問題は、トランザクションサイトへのトランザクションの伝達をコーディネータに行わせ、各サイトにそれぞれ自身のエフェメラルノードを作成させることで対応できます。

リーダー選挙

ZooKeeper で簡単にリーダー選挙を行うには、クライアントの「提案 (proposal)」を表す znode を作成するときに、SEQUENCE|EPHEMERAL フラグを使用します。具体的には、"/election" のような znode を作成し、各プロセスの側では SEQUENCE|EPHEMERAL フラグをセットして子ノード "/election/n_" を作成するようにします。シーケンスフラグをセットしているので、作成される子ノードには、それまでに "/election" の子に割り当てられた番号より大きい連番が自動的に追加されます。ここで、追加される連番が最も小さい znode を作成したプロセスがリーダーとなります。

ただし、これだけでは十分ではありません。リーダーに障害が発生していないかどうか監視し、現在のリーダーがダウンした場合は新しいクライアントが新リーダーになるようにすることが重要です。手っ取り早い解決策として、すべてのアプリケーションプロセスに対し、現在番号が最も小さい znode をウォッチさせ、この znode が消滅したときに、自分が新しいリーダーなのかどうかをチェックさせる方法があります (番号が最も小さい znode はエフェメラルノードなので、リーダーがダウンすると消滅することに注意してください)。ただし、この方法では「殺到現象」が発生します。現在のリーダーがダウンすると、ほかのすべてのプロセスが通知を受け取り、ノード "/election" の最新の子のリストを取得するために、"/election" に対して getChildren を実行するからです。この場合、非常に多くのクライアントが存在すると、ZooKeeper サーバーが処理しなければならない操作の数も急上昇します。こうした「殺到現象」を避けるには、一連の znode のうち、次に小さい番号を持つ znode をウォッチすれば十分です。あるクライアントが、自分のウォッチしている znode が消滅したという通知を受け取った場合、消滅した znode より小さい番号のノードがほかに存在しなければ、そのクライアントが新リーダーになります。この方法では、すべてのクライアントに同一の znode をウォッチさせないようにすることで「殺到現象」を回避している点に注意してください。

疑似コードで記述すると、次のようになります。

ELECTION は、アプリケーションで決めたパスとします。リーダーを志願するには、次の手順に従います。

  1. パスに "ELECTION/n_" を指定し、SEQUENCE フラグと EPHEMERAL フラグを両方ともセットして znode z を作成します。

  2. "ELECTION" の子を C とし、z の連番を i とします。

  3. "ELECTION/n_j" に対する変更をウォッチします。j は、j < i であり、かつ、n_j が C の znode であるという条件が成立する最も小さな連番です。

znode が削除されたという通知を受け取ったら、次の手順に従います。

  1. ELECTION の新しい子のリストを C とします。

  2. もし z が C の中で番号の最も小さなノードである場合には、リーダー選挙の手順を実行します。

  3. それ以外の場合には、"ELECTION/n_j" に対する変更をウォッチします。j は、j < i であり、かつ、n_j が C の znode であるという条件が成立する最も小さな連番です。

ある znode があって、その znode より番号の小さな znode が子のリストの中に存在しないからといって、その znode の作成者が、自分が現在リーダーであることを認識しているとは限らないことに注意してください。この点については、アプリケーションの側で、リーダーがリーダー選挙の手順を実行したことを確認するための znode を別に作成することで対処できます。