Apache > Hadoop > ZooKeeper
 

ZooKeeper でのプログラミング - 基礎のチュートリアル

はじめに

このチュートリアルでは、ZooKeeper を使ったバリアと生産者−消費者キューの簡単な実装を示します。ここでは、それぞれのクラスを Barrier と Queue と呼びます。以下に示すサンプルでは、ZooKeeper サーバーが少なくとも 1 つ 実行されていることを前提としています。

次に示すのは、Barrier と Queue のどちらのプリミティブでも共通に使われるコードです。

    static ZooKeeper zk = null;
    static Integer mutex;

    String root;

    SyncPrimitive(String address) {
        if(zk == null){
            try {
                System.out.println("Starting ZK:");
                zk = new ZooKeeper(address, 3000, this);
                mutex = new Integer(-1);
                System.out.println("Finished starting ZK: " + zk);
            } catch (IOException e) {
                System.out.println(e.toString());
                zk = null;
            }
        }
    }

    synchronized public void process(WatchedEvent event) {
        synchronized (mutex) {
            mutex.notify();
        }
    }

どちらのクラスも SyncPrimitive を継承します。このようにすることで、SyncPrimitive のコンストラクタのすべてのプリミティブに共通するステップを実行します。ここでは、サンプルをできるだけシンプルにするために、バリアオブジェクトまたはキューオブジェクトを初めてインスタンス化するときに ZooKeeper オブジェクトを作成します。Barrier と Queue の以後のインスタンスでは、ZooKeeper オブジェクトが存在するかどうかをチェックします。別のやり方として、アプリケーションに ZooKeeper オブジェクトを作成させて、これを Barrier と Queue のコンストラクタに渡すこともできるでしょう。

process() メソッドは、ウォッチによってトリガされた通知を処理するのに使います。以下のセクションでは、ウォッチを設定するコードを示します。ウォッチとは、ノードが変更されたことを ZooKeeper がクライアントに通知するための内部構造です。たとえば、あるクライアントが、ほかのクライアントがバリアを出るまで待機する場合、待機する側のクライアントは特定のノードにウォッチを設定して、そのノードが変更されるまで待機することができます。この場合、ノードの変更が待機の終わりを示します。こうした動作は、実際にサンプルを見ればすぐに理解できるでしょう。

バリア

バリアは、複数のプロセスによる計算の開始と終了を同期化できるようにするプリミティブです。実装にあたっての大まかな考え方は、個々のプロセスノードの親となるバリアノードを用意するというものです。たとえば、バリアノードを "/b1" とします。各プロセス "p" は、ノード "/b1/p" を作成します。十分な数のプロセスがそれぞれ対応するノードを作成したら、プロセス全体で計算を開始できます。

この例では、各プロセスが Barrier オブジェクトをインスタンス化します。Barrier オブジェクトのコンストラクタは次のものを引数として取ります。

  • ZooKeeper サーバーのアドレス ("zoo1.foo.com:2181" など)

  • ZooKeeper 上のバリアノードのパス ("/b1" など)

  • プロセスの数

Barrier のコンストラクタは、ZooKeeper サーバーのアドレスを親クラスのコンストラクタに渡します。親クラスは、ZooKeeper のインスタンスが存在しなければ作成します。次に、Barrier のコンストラクタは、すべてのプロセスノードの親ノードであるバリアノードを ZooKeeper 上に作成します。これをルートと呼びます (注意: このルートは、ZooKeeper ルートの "/" ではありません)。

        /**
         * Barrier constructor
         *
         * @param address
         * @param root
         * @param size
         */
        Barrier(String address, String root, int size) {
            super(address);
            this.root = root;
            this.size = size;

            // Create barrier node
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }

            // My node name
            try {
                name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
            } catch (UnknownHostException e) {
                System.out.println(e.toString());
            }

        }

プロセスがバリアに入るには、enter() を呼び出します。プロセスは、自身のホスト名からノード名を作り、ルートの下にこのノード名で自分自身を表すノードを作成します。ついで、十分な数のプロセスがバリアに入るまで待機します。具体的には、ルートノードが持つ子の数を "getChildren()" を使ってチェックし、十分な数の子が存在しなければ、通知が送られてくるのを待ちます。ルートノードに変更があったときに通知を受け取るためには、プロセスの側でウォッチを設定する必要がありますが、これは "getChildren()" の呼び出しを通じて行います。実際のコードを見るとわかるように、"getChildren()" には 2 つのパラメータがあります。最初のパラメータは、読み取り対象のノードで、2 番目のパラメータはウォッチを設定するかどうかを示すフラグ (ブール値) です。コードでは、このフラグを true に設定しています。

        /**
         * Join barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */

        boolean enter() throws KeeperException, InterruptedException{
            zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);

                    if (list.size() < size) {
                        mutex.wait();
                    } else {
                        return true;
                    }
                }
            }
        }

enter() は KeeperException と InterruptedException の両方をスローすることに注意してください。したがって、これらの例外のキャッチと処理は、アプリケーションの側で行う必要があります。

計算が終わったら、プロセスは leave() を呼び出してバリアを出ます。まず、対応するノードを削除し、次にルートノードの子を取得します。少なくとも子が 1 つある場合は、通知が来るのを待ちます (getChildren() の呼び出しで 2 番目のパラメータが true になっていることに注意してください。これは、ZooKeeper がルートノードにウォッチを設定する必要があることを示します)。通知を受け取ったら、プロセスは再度、ルートノードに子があるかどうかをチェックします。

        /**
         * Wait until all reach barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */

        boolean leave() throws KeeperException, InterruptedException{
            zk.delete(root + "/" + name, 0);
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);
                        if (list.size() > 0) {
                            mutex.wait();
                        } else {
                            return true;
                        }
                    }
                }
        }
    }

生産者−消費者キュー

生産者−消費者キューは、複数のプロセスが要素を生成して消費するために使う分散データ構造です。生産者 (producer) プロセスは、新しい要素を作成し、作成した要素をキューに追加します。消費者 (consumer) プロセスは、リストから要素を取り出し、取り出した要素を処理します。以下に示す実装では、要素を単純な整数にしています。キューはルートノードによって表され、生産者プロセスはルートノードの子となる新しいノードを作成して、キューに要素を追加します。

次に示すコードは、オブジェクトのコンストラクタに相当します。Barrier オブジェクトの場合と同様、まず親クラス SyncPrimitive のコンストラクタを呼び出し、ZooKeeper オブジェクトが存在しなければ作成します。次に、キューのルートノードが存在するかどうかを調べ、存在しなければルートノードを作成します。

        /**
         * Constructor of producer-consumer queue
         *
         * @param address
         * @param name
         */
        Queue(String address, String name) {
            super(address);
            this.root = name;
            // Create ZK node name
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }
        }

生産者プロセスは要素をキューに追加するために "produce()" を呼び出し、整数を引数として渡します。"produce()" は、キューに要素を追加するために "create()" を使って新しいノードを作成し、SEQUENCE フラグを指定して、ルートノードに関連付けられたシーケンサカウンタの値を末尾に追加するよう指示します。こうすることで、キューのすべての要素を順序付けることができ、キューの中で一番古い要素が次に消費される要素になることを保証できます。

        /**
         * Add element to the queue.
         *
         * @param i
         * @return
         */

        boolean produce(int i) throws KeeperException, InterruptedException{
            ByteBuffer b = ByteBuffer.allocate(4);
            byte[] value;

            // Add child with value i
            b.putInt(i);
            value = b.array();
            zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT_SEQUENTIAL);

            return true;
        }

消費者プロセスは、要素を消費するために、ルートノードの子を取得し、カウンタ値が最も小さいノードを読み取って、その要素を返します。矛盾が生じた場合には、その矛盾の当事者である 2 つのプロセスの一方は、ノードを削除することができず、削除操作は例外をスローします。

getChildren() を呼び出すと、辞書順に並んだ子のリストが返されます。辞書順は必ずしもカウンタ値の番号順と同じではないので、どの要素のカウンタ値が最も小さいかを調べる必要があります。ここでは、最も小さいカウンタ値の要素を見つけるために、リストを走査して各要素からプリフィックス "element" を取り除いています。

        /**
         * Remove first element from the queue.
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
        int consume() throws KeeperException, InterruptedException{
            int retvalue = -1;
            Stat stat = null;

            // Get the first element available
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);
                    if (list.size() == 0) {
                        System.out.println("Going to wait");
                        mutex.wait();
                    } else {
                        Integer min = new Integer(list.get(0).substring(7));
                        for(String s : list){
                            Integer tempValue = new Integer(s.substring(7));
                            //System.out.println("Temporary value: " + tempValue);
                            if(tempValue < min) min = tempValue;
                        }
                        System.out.println("Temporary value: " + root + "/element" + min);
                        byte[] b = zk.getData(root + "/element" + min,
                                    false, stat);
                        zk.delete(root + "/element" + min, 0);
                        ByteBuffer buffer = ByteBuffer.wrap(b);
                        retvalue = buffer.getInt();

                        return retvalue;
                    }
                }
            }
        }
    }

全ソースリスト

SyncPrimitive.Java
SyncPrimitive.Java
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.nio.ByteBuffer;
import java.util.List;
import java.util.Random;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;

public class SyncPrimitive implements Watcher {

    static ZooKeeper zk = null;
    static Integer mutex;

    String root;

    SyncPrimitive(String address) {
        if(zk == null){
            try {
                System.out.println("Starting ZK:");
                zk = new ZooKeeper(address, 3000, this);
                mutex = new Integer(-1);
                System.out.println("Finished starting ZK: " + zk);
            } catch (IOException e) {
                System.out.println(e.toString());
                zk = null;
            }
        }
        //else mutex = new Integer(-1);
    }

    synchronized public void process(WatchedEvent event) {
        synchronized (mutex) {
            //System.out.println("Process: " + event.getType());
            mutex.notify();
        }
    }

    /**
     * Barrier
     */
    static public class Barrier extends SyncPrimitive {
        int size;
        String name;

        /**
         * Barrier constructor
         *
         * @param address
         * @param root
         * @param size
         */
        Barrier(String address, String root, int size) {
            super(address);
            this.root = root;
            this.size = size;

            // Create barrier node
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }

            // My node name
            try {
                name = new String(InetAddress.getLocalHost().getCanonicalHostName().toString());
            } catch (UnknownHostException e) {
                System.out.println(e.toString());
            }

        }

        /**
         * Join barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */

        boolean enter() throws KeeperException, InterruptedException{
            zk.create(root + "/" + name, new byte[0], Ids.OPEN_ACL_UNSAFE,
                    CreateMode.EPHEMERAL_SEQUENTIAL);
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);

                    if (list.size() < size) {
                        mutex.wait();
                    } else {
                        return true;
                    }
                }
            }
        }

        /**
         * Wait until all reach barrier
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */

        boolean leave() throws KeeperException, InterruptedException{
            zk.delete(root + "/" + name, 0);
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);
                        if (list.size() > 0) {
                            mutex.wait();
                        } else {
                            return true;
                        }
                    }
                }
        }
    }

    /**
     * Producer-Consumer queue
     */
    static public class Queue extends SyncPrimitive {

        /**
         * Constructor of producer-consumer queue
         *
         * @param address
         * @param name
         */
        Queue(String address, String name) {
            super(address);
            this.root = name;
            // Create ZK node name
            if (zk != null) {
                try {
                    Stat s = zk.exists(root, false);
                    if (s == null) {
                        zk.create(root, new byte[0], Ids.OPEN_ACL_UNSAFE,
                                CreateMode.PERSISTENT);
                    }
                } catch (KeeperException e) {
                    System.out
                            .println("Keeper exception when instantiating queue: "
                                    + e.toString());
                } catch (InterruptedException e) {
                    System.out.println("Interrupted exception");
                }
            }
        }

        /**
         * Add element to the queue.
         *
         * @param i
         * @return
         */

        boolean produce(int i) throws KeeperException, InterruptedException{
            ByteBuffer b = ByteBuffer.allocate(4);
            byte[] value;

            // Add child with value i
            b.putInt(i);
            value = b.array();
            zk.create(root + "/element", value, Ids.OPEN_ACL_UNSAFE,
                        CreateMode.PERSISTENT_SEQUENTIAL);

            return true;
        }


        /**
         * Remove first element from the queue.
         *
         * @return
         * @throws KeeperException
         * @throws InterruptedException
         */
        int consume() throws KeeperException, InterruptedException{
            int retvalue = -1;
            Stat stat = null;

            // Get the first element available
            while (true) {
                synchronized (mutex) {
                    List<String> list = zk.getChildren(root, true);
                    if (list.size() == 0) {
                        System.out.println("Going to wait");
                        mutex.wait();
                    } else {
                        Integer min = new Integer(list.get(0).substring(7));
                        for(String s : list){
                            Integer tempValue = new Integer(s.substring(7));
                            //System.out.println("Temporary value: " + tempValue);
                            if(tempValue < min) min = tempValue;
                        }
                        System.out.println("Temporary value: " + root + "/element" + min);
                        byte[] b = zk.getData(root + "/element" + min,
                                    false, stat);
                        zk.delete(root + "/element" + min, 0);
                        ByteBuffer buffer = ByteBuffer.wrap(b);
                        retvalue = buffer.getInt();

                        return retvalue;
                    }
                }
            }
        }
    }

    public static void main(String args[]) {
        if (args[0].equals("qTest"))
            queueTest(args);
        else
            barrierTest(args);

    }

    public static void queueTest(String args[]) {
        Queue q = new Queue(args[1], "/app1");

        System.out.println("Input: " + args[1]);
        int i;
        Integer max = new Integer(args[2]);

        if (args[3].equals("p")) {
            System.out.println("Producer");
            for (i = 0; i < max; i++)
                try{
                    q.produce(10 + i);
                } catch (KeeperException e){

                } catch (InterruptedException e){

                }
        } else {
            System.out.println("Consumer");

            for (i = 0; i < max; i++) {
                try{
                    int r = q.consume();
                    System.out.println("Item: " + r);
                } catch (KeeperException e){
                    i--;
                } catch (InterruptedException e){

                }
            }
        }
    }

    public static void barrierTest(String args[]) {
        Barrier b = new Barrier(args[1], "/b1", new Integer(args[2]));
        try{
            boolean flag = b.enter();
            System.out.println("Entered barrier: " + args[2]);
            if(!flag) System.out.println("Error when entering the barrier");
        } catch (KeeperException e){

        } catch (InterruptedException e){

        }

        // Generate random integer
        Random rand = new Random();
        int r = rand.nextInt(100);
        // Loop for rand iterations
        for (int i = 0; i < r; i++) {
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {

            }
        }
        try{
            b.leave();
        } catch (KeeperException e){

        } catch (InterruptedException e){

        }
        System.out.println("Left barrier");
    }
}