Apache > Hadoop > Core
 

Map/Reduce チュートリアル

はじめに

このドキュメントでは、チュートリアルとして役立つことを目的に、ユーザーが触れる Hadoop Map/Reduce のすべての側面についてまとめて説明します。

必要なもの

Hadoop のインストールと設定が済み、すでに実行されていることを確認してください。詳細については、以下を参照してください。

概要

Hadoop Map/Reduce は、どこにでもあるごく普通のハードウェアで構成した (数千ノードの) 大規模なクラスタ上で、膨大なデータ (数テラバイトのデータセット) を並列処理するアプリケーションを簡単に記述できるようにするソフトウェアフレームワークです。

通常、Map/Reduce のジョブは、入力データセットを独立したチャンクに分割し、分割されたチャンクは複数の map タスクによって完全に並列処理されます。Map/Reduce フレームワークは、map の出力をソートし、これが reduce タスクへの入力になります。一般に、ジョブの入力と出力の両方がファイルシステムに保存されます。Map/Reduce フレームワークは、タスクのスケジューリング、失敗したタスクの再実行も行います。

通常、計算ノードとストレージノードは同一です。すなわち、Map/Reduce フレームワークと Hadoop Distributed File System (HDFS のアーキテクチャを参照) は、同一のノードセット上で実行されます。この構成により、Map/Reduce フレームワークは目的のデータがすでに存在するノード上でタスクを効率的にスケジューリングすることができ、クラスタ全体で非常に高い帯域幅を発揮できます。

Map/Reduce フレームワークは、マスターとなる 1 つの JobTracker と、クラスタノードごとに 1 つ存在するスレーブとなる TaskTracker から構成されます。マスターは、ジョブのコンポーネントタスクをスレーブ上でスケジューリングし、これらのコンポーネントタスクを監視して、失敗したタスクを再実行します。スレーブは、マスターから指示されたタスクを実行します。

アプリケーションでは最低限、入出力の場所を指定し、適切なインタフェースおよび/または抽象クラスの実装を通じて map 関数と reduce 関数を用意します。これらの設定とその他のジョブ関連のパラメータが、ジョブ設定を構成することになります。Hadoop のジョブクライアントは、ジョブ (jar、実行可能ファイルなど) とその設定を JobTracker にサブミットし、JobTracker はこれを受けて、サブミットされたソフトウェアと設定をスレーブに配布するとともに、タスクのスケジューリングと監視を行い、状況と診断情報をジョブクライアントに知らせます。

Hadoop フレームワークは JavaTM で実装されていますが、Map/Reduce アプリケーションを Java で記述する必要はありません。

  • Hadoop Streaming は、ユーザーが任意の実行可能ファイル (シェルユーティリティなど) を MapperReducer としてジョブを作成・実行できるようにするためのユーティリティです。
  • Hadoop Pipes は、(非 JNITM ベースの) Map/Reduce アプリケーションを実装するための SWIG 互換の C++ API です。

入力と出力

Map/Reduce フレームワークが扱うデータは、<key, value> ペアだけです。すなわち、Map/Reduce フレームワークは、ジョブへの入力を <key, value> ペアの集合とみなし、ジョブの出力を (入力とは異なるタイプの) <key, value> ペアとして生成します。

key クラスと value クラスでは、Map/Reduce フレームワークがこれらのクラスを直列化する必要があるため、Writable インタフェースを実装する必要があります。また、key クラスでは、Map/Reduce フレームワークによるソートを容易にするために、WritableComparable インタフェースを実装する必要があります。

次に示すのは、Map/Reduce ジョブの入力と出力のタイプです。

(入力) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (出力)

例: WordCount v1.0

詳細な説明に入る前に、Map/Reduce アプリケーションの例を取り上げて、どのように動作するのか感触をつかんでみましょう。

WordCount は、与えられた入力セット中の各単語の出現頻度をカウントする簡単なアプリケーションです。

WordCount は、ローカルスタンドアロンモード、疑似分散モード、さらに完全分散モードのいずれで実行されている Hadoop でも動作します (Hadoop クイックスタートを参照)。

ソースコード

WordCount.java
1. package org.myorg;
2.
3. import java.io.IOException;
4. import java.util.*;
5.
6. import org.apache.hadoop.fs.Path;
7. import org.apache.hadoop.conf.*;
8. import org.apache.hadoop.io.*;
9. import org.apache.hadoop.mapred.*;
10. import org.apache.hadoop.util.*;
11.
12. public class WordCount {
13.
14.    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
15.      private final static IntWritable one = new IntWritable(1);
16.      private Text word = new Text();
17.
18.      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
19.        String line = value.toString();
20.        StringTokenizer tokenizer = new StringTokenizer(line);
21.        while (tokenizer.hasMoreTokens()) {
22.          word.set(tokenizer.nextToken());
23.          output.collect(word, one);
24.        }
25.      }
26.    }
27.
28.    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
29.      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
30.        int sum = 0;
31.        while (values.hasNext()) {
32.          sum += values.next().get();
33.        }
34.        output.collect(key, new IntWritable(sum));
35.      }
36.    }
37.
38.    public static void main(String[] args) throws Exception {
39.      JobConf conf = new JobConf(WordCount.class);
40.      conf.setJobName("wordcount");
41.
42.      conf.setOutputKeyClass(Text.class);
43.      conf.setOutputValueClass(IntWritable.class);
44.
45.      conf.setMapperClass(Map.class);
46.      conf.setCombinerClass(Reduce.class);
47.      conf.setReducerClass(Reduce.class);
48.
49.      conf.setInputFormat(TextInputFormat.class);
50.      conf.setOutputFormat(TextOutputFormat.class);
51.
52.      FileInputFormat.setInputPaths(conf, new Path(args[0]));
53.      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
54.
55.      JobClient.runJob(conf);
57.    }
58. }
59.

使い方

Hadoop のインストール先のルートが HADOOP_HOME に設定されていて、インストールされている Hadoop のバージョンが HADOOP_VERSION に設定されているとします。次の要領で WordCount.java をコンパイルし、jar を作成します。

$ mkdir wordcount_classes
$ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar -d wordcount_classes WordCount.java
$ jar -cvf /usr/joe/wordcount.jar -C wordcount_classes/ .

ディレクトリは次のものを使います。

  • /usr/joe/wordcount/input - HDFS 内の入力用ディレクトリ
  • /usr/joe/wordcount/output - HDFS 内の出力用ディレクトリ

入力として使うサンプルテキストファイルは、次のようになっています。

$ bin/hadoop dfs -ls /usr/joe/wordcount/input/
/usr/joe/wordcount/input/file01
/usr/joe/wordcount/input/file02

$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
Hello World Bye World

$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
Hello Hadoop Goodbye Hadoop

アプリケーションを実行します。

$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output

出力は次のようになります。

$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop 2
Hello 2
World 2

アプリケーションでは、-files オプションを使って、タスクの現在の作業ディレクトリに存在するパスをカンマで区切って指定することができます。-libjars オプションを使うと、map と reduce のクラスパスに jar を追加できます。-archives オプションを使うと、アーカイブを引数として渡すことができます。引数で渡されたアーカイブは展開 (unzip/unjar) され、タスクの現在の作業ディレクトリ内に jar/zip の名前の部分をファイル名とするリンクが作成されます。コマンドラインオプションの詳細については、Hadoop コマンドガイドを参照してください。

次に示すのは、サンプルアプリケーションの wordcount-libjars-files を使って実行する例です。
hadoop jar hadoop-examples.jar wordcount -files cachefile.txt -libjars mylib.jar input output

ウォークスルー

WordCount アプリケーションには、特に難しいところはありません。

Mapper の実装 (14-26 行) では、指定された TextInputFormat (49 行) で渡された入力を、map メソッド (18-25 行) で 1 行ずつ処理します。次に、StringTokenizer によって、空白を区切りとするトークンに行を分割し、< <word>, 1> というキーとバリューのペアを出力します。

サンプルテキストファイルを入力として使うと、最初の map の出力は以下のようになります。
< Hello, 1>
< World, 1>
< Bye, 1>
< World, 1>

また、次の map の出力は以下のようになります。
< Hello, 1>
< Hadoop, 1>
< Goodbye, 1>
< Hadoop, 1>

ある与えられたジョブが生成する map の数、およびこれらの map を細かくコントロールする方法については、このチュートリアルのあとの方で詳しく説明します。

WordCount では、combiner (46 行) も指定しています。したがって各 map の出力は、(ジョブ設定によれば Reducer と同じ) ローカル combiner を経由して渡され、key でソートした後にローカルに集約されます。

最初の map の出力は次のようになります。
< Bye, 1>
< Hello, 1>
< World, 2>

また、次の map の出力は以下のようになります。
< Goodbye, 1>
< Hadoop, 2>
< Hello, 1>

Reducer の実装 (28-36 行) では、reduce メソッド (29-35 行) で値の合計を計算し、各キー (このアプリケーションでは単語) の出現頻度を求めています。

したがって、ジョブの出力は以下のようになります。
< Bye, 1>
< Goodbye, 1>
< Hadoop, 2>
< Hello, 2>
< World, 2>

run メソッドでは、(コマンドライン経由で渡された) 入出力のパス、キー/バリューのタイプ、入出力の形式など、ジョブのさまざまな側面について JobConf で指定します。ついで、run メソッドは、JobClient.runJob (55 行) を呼び出してジョブをサブミットし、その進捗を監視します。

JobConfJobClientTool、およびその他のインタフェースとクラスについては、このチュートリアルのあとの方で詳しく説明します。

Map/Reduce - ユーザーインタフェース

ここでは、ユーザーが触れる Map/Reduce フレームワークのすべての側面について必要な範囲で詳しく説明します。ここで説明する内容を理解することで、ユーザーはジョブの実装、設定、チューニングを細かく行えるようになるはずです。ただし、各クラス/インタフェースの javadoc が、ユーザーの参照できる最も詳しいドキュメントであることに変わりはありません。このドキュメントは、あくまでチュートリアルとして役立つことを意図しています。

最初にMapper インタフェースと Reducer インタフェースを取り上げましょう。通常、アプリケーションでは、map メソッドと reduce メソッドを提供するために、これらのインタフェースを実装することになります。

次に、JobConfJobClientPartitionerOutputCollectorReporterInputFormatOutputFormatOutputCommitter、およびその他のコアインタフェースについて取り上げます。

最後に、DistributedCacheIsolationRunner など、Map/Reduce フレームワークの便利な機能をいくつか取り上げます。

本体

通常、アプリケーションでは、Mapper インタフェースと Reducer インタフェースを実装し、map メソッドと reduce メソッドを提供します。これらのメソッドは、ジョブの中核となります。

Mapper

Mapper は、入力のキー/バリューのペアを中間的なキー/バリュー・ペアのセットに map します。

map は、入力レコードを中間レコードに変換する個別のタスクです。変換後の中間レコードが、入力レコードと同じタイプである必要はありません。与えられた 1 つの入力ペアが、0 個の出力ペアに map されたり、複数のペアに map されたりしてもかまいません。

Hadoop Map/Reduce フレームワークは、ジョブの InputFormat によって生成された各 InputSplit に対し、map タスクを 1 つ生成します。

全般に、Mapper の実装には、JobConfigurable.configure(JobConf) メソッドを介してジョブの JobConf が渡され、Mapper の実装はこれをオーバーライドして自身を初期化します。ついで Map/Reduce フレームワークが、与えられたタスクの InputSplit 内の各キー/バリューのペアに対して、map(WritableComparable, Writable, OutputCollector, Reporter) を呼び出します。アプリケーションは、Closeable.close() メソッドをオーバーライドして、必要な任意のクリーンアップ処理を実行できます。

出力ペアは、入力ペアと同じタイプである必要はありません。与えられた 1 つの入力ペアが、0 個の出力ペアに map されたり、複数のペアに map されたりしてもかまいません。OutputCollector.collect(WritableComparable,Writable) を呼び出すと、出力ペアが回収されます。

アプリケーションでは Reporter を使って、進捗の報告、アプリケーションレベルでのステータスメッセージの設定、および Counters の更新を行ったり、単に稼働していることを示したりすることができます。

ある与えられた出力キーに関連付けられたすべての中間値は、Map/Reduce フレームワークによってグループ化され、最終的な出力を決定するために Reducer に渡されます。ユーザーは、JobConf.setOutputKeyComparatorClass(Class) を介して Comparator を指定することで、グループ化をコントロールすることができます。

Mapper の出力はソートされ、Reducer ごとにパーティションに分割されます。パーティションの総数は、ジョブの reduce タスクの数と同じです。ユーザーは、独自の Partitioner を実装することで、どのキー (すなわちレコード) をどの Reducer に渡すかをコントロールすることができます。

ユーザーは JobConf.setCombinerClass(Class) を介して省略可能な combiner を指定し、中間出力をローカルに集約することができます。これは、Mapper から Reducer に転送されるデータの総量を減らすのに役立ちます。

中間のソートされた出力は、常に単純な (key-len, key, value-len, value) 形式で保存されます。ユーザーは JobConf を介して、中間出力をいつどのように圧縮すべきか、またどの CompressionCodec を使うかをコントロールすることができます。

Map の数

map の数は通常、入力の合計サイズ、つまり入力ファイルの合計ブロック数によって決まります。

map の最適な並列処理レベルは、1 ノードにつきおよそ 10-100 map のようです。ただし、CPU にかかる負荷が非常に低い map タスクでは、300 map まで設定した実績もあります。タスクのセットアップには時間がかかるので、map の実行に少なくとも 1 分かかる程度がベストです。

したがって、たとえば 10TB の入力データがあって、ブロックサイズが 128MB である場合、map の数は 82,000 になります。ただし、setNumMapTasks(int) を使って (Map/Reduce フレームワークにヒントを提供するだけですが)、map 数をさらに増やすこともできます。

Reducer

Reducer は、キーが同じ中間値のセットを、さらに小さな値のセットに reduce します。

ジョブの reduce 数は、ユーザーが JobConf.setNumReduceTasks(int) を介して設定します。

全般に、Reducer の実装には、JobConfigurable.configure(JobConf) メソッドを介してジョブの JobConf が渡され、Reducer の実装はこれをオーバーライドして自身を初期化することができます。ついで Map/Reduce フレームワークが、グループ化された入力内の各 <key, (値のリスト)> ペアに対して、reduce(WritableComparable, Iterator, OutputCollector, Reporter) メソッドを呼び出します。アプリケーションは、Closeable.close() メソッドをオーバーライドして、必要な任意のクリーンアップ処理を実行できます。

Reducer には、shuffle、sort、および reduce の 3 つの主要フェーズがあります。

Shuffle

Reducer への入力は、Mapper からのソートされた出力です。shuffle フェーズでは、Map/Reduce フレームワークは、HTTP を経由して、すべての Mapper の出力から適切なパーティションを取り出します。

Sort

Map/Reduce フレームワークはこのフェーズで、Reducer の入力をキーごとにグループ化します (異なる Mapper が同一のキーを出力している可能性があるからです)。

shuffle フェーズと sort フェーズは同時進行し、map 出力が取り出される一方で、map 出力のマージが行われます。

2次ソート

中間キーをグループ化するための結合ルールを、reduce 前のキーのグループ化の結合ルールと異なるものにする必要がある場合は、JobConf.setOutputValueGroupingComparator(Class) を介して Comparator を指定できます。JobConf.setOutputKeyComparatorClass(Class) は、中間キーをどのようにグループ化するかをコントロールするのに使用できるので、これらを組み合わせて使って、値に対する 2 次ソートを行うことができます。

Reduce

このフェーズでは、グループ化された各 <key, (値のリスト)> ペアに対して、reduce(WritableComparable, Iterator, OutputCollector, Reporter) メソッドが呼び出されます。

reduce タスクの出力は通常、OutputCollector.collect(WritableComparable, Writable) を介して FileSystem に書き込まれます。

アプリケーションでは Reporter を使って、進捗の報告、アプリケーションレベルでのステータスメッセージの設定、および Counters の更新を行ったり、単に稼働していることを示したりすることができます。

Reducer の出力はソートされていません

Reduce の数

reduce の数は、0.95 または 1.75 に (<ノード数> * mapred.tasktracker.reduce.tasks.maximum) を乗じたものが最適の数になるようです。

0.95 では、reduce のすべてが map の終了と同時にただちに起動可能となり、map 出力の転送を開始できます。1.75 では、処理の高速なノードは reduce の最初のラウンドを終了して次の reduce を起動し、負荷分散に優れた処理を行うことができます。

reduce の数を増やすと、フレームワークのオーバーヘッドが増えますが、負荷分散は向上し、失敗のコストも抑制されます。

上に示した倍率は、フレームワーク内の少数の reduce スロットを投機的タスクおよび失敗したタスク用に予約しておくために、全体の数よりいくらか少ない値になっています。

Reducer NONE

reduce 処理が必要ない場合は、まったく問題なく、reduce タスクの数をゼロに設定できます。

この場合、map タスクの出力は直接 FileSystem に送られ、setOutputPath(Path) で設定された出力パスに格納されます。Map/Reduce フレームワークは、FileSystem に map の出力を書き出す前に、これらの出力をソートすることはしません。

Partitioner

Partitioner は、キー空間をパーティションに分割します。

Partitioner は、中間の map 出力のキーのパーティションへの分割をコントロールします。キー (またはキーのサブセット) はパーティションを取り出すのに使われ、そのためには一般にハッシュ関数が使われます。パーティションの総数は、ジョブの reduce タスクの数と同じです。したがって、Partitioner は、m 個の reduce タスクのうちどれに中間キー (すなわちレコード) を送るかをコントロールすることになります。

HashPartitioner がデフォルトの Partitioner です。

Reporter

Reporter は、Map/Reduce アプリケーションが進捗の報告、アプリケーションレベルでのステータスメッセージの設定、および Counters の更新を行えるようにするために用意されている機能です。

Mapper の実装と Reducer の実装では、Reporter を使って進捗を報告したり、単に稼働していることを示したりすることができます。アプリケーションが個々のキー/バリューのペアを処理するのに相当な時間がかかるようなケースでは、この機能は不可欠です。なぜなら、Map/Reduce フレームワークの側では、タスクがタイムアウトしたと判断し、タスクを kill する可能性があるからです。このような状況を避けるもう一つの方法として、設定パラメータ mapred.task.timeout に十分大きな値を設定する方法があります (場合によっては、値にゼロを指定し、タイムアウトを無効にすることもできます)。

アプリケーションは、Reporter を使って Counters を更新することもできます。

OutputCollector

OutputCollector は、Mapper または Reducer からのデータ出力 (中間出力またはジョブの出力) を回収するために Map/Reduce フレームワークが提供している機能を汎用化したものです。

Hadoop Map/Reduce には、汎用性のある有益な mapper、reducer、および partitioner から構成されるライブラリが付属しています。

ジョブ設定

JobConf は Map/Reduce のジョブ設定を表します。

JobConf は、Hadoop フレームワークに実行させる Map/Reduce ジョブをユーザーが記述するための最も重要なインタフェースです。Map/Reduce フレームワークは、JobConf で記述された内容に忠実にジョブを実行しようとします。ただし、以下の 2 点は除きます。

  • 一部の設定パラメータは管理者によって final とマークされていることがあり、この場合、これらの設定パラメータは変更できません。
  • ジョブのパラメータの中には指定どおりに設定できるものもありますが (setNumReduceTasks(int) など)、一方で、Map/Reduce フレームワークやジョブ設定のほかの部分と微妙な相互作用を持つものもあり、これらの設定パラメータは必ずしも指定どおりに設定できるわけではありません (setNumMapTasks(int))。

通常、JobConf は、Mapper、combiner (使用する場合)、PartitionerReducerInputFormatOutputFormat および OutputCommitter の実装を指定するのに使います。JobConf は、入力ファイルのセット (setInputPaths(JobConf, Path...) /addInputPath(JobConf, Path)) と (setInputPaths(JobConf, String) /addInputPaths(JobConf, String))、および出力ファイルの書き出し先 (setOutputPath(Path)) を指定するのにも使います。

また、JobConf を使って、ジョブのその他の高度な側面を指定することもできます。たとえば、使用する ComparatorDistributedCache に入れるファイル、中間出力やジョブ出力を圧縮するかどうかや圧縮方法、ユーザーが用意したスクリプトを介したデバッグ (setMapDebugScript(String)/setReduceDebugScript(String))、ジョブのタスクが投機的に実行できるかどうか (setMapSpeculativeExecution(boolean))/(setReduceSpeculativeExecution(boolean))、タスクごとの最大試行回数 (setMaxMapAttempts(int)/setMaxReduceAttempts(int))、ジョブが許容できるタスクの失敗の割合 (setMaxMapTaskFailuresPercent(int)/setMaxReduceTaskFailuresPercent(int)) などを指定できます。

もちろん、ユーザーは set(String, String)/get(String, String) を使って、アプリケーションが必要とする任意のパラメータを設定または取得することができます。ただし、膨大な量の (読み取り専用) データに対しては、DistributedCache を使ってください。

タスクの実行と環境

TaskTracker は、Mapper/Reducer タスクを、独立した jvm の中で子プロセスとして実行します。

子タスクは親の TaskTracker の環境を引き継ぎます。ユーザーは、JobConfmapred.child.java.opts 設定パラメータを介して、子 jvm に追加オプションを指定できます。たとえば、実行時リンカが共有ライブラリを探すための非標準パスは、-Djava.library.path=<> で指定できます。mapred.child.java.opts にシンボル @taskid@ が含まれている場合、このシンボルは map/reduce タスクの taskid の値に置き換えられます。

次に示すのは、複数の引数や代入を使用した例です。jvm GC のログを記録し、パスワードなしの JVM JMX エージェントを起動して jconsole などに接続し、子メモリやスレッドをウォッチしたり、スレッドのダンプを取得したりできるようにしています。また、子 jvm のヒープサイズの上限を 512MB に設定し、子 jvm の java.library.path にパスを追加しています。

<property>
  <name>mapred.child.java.opts</name>
  <value>
     -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc
     -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false
  </value>
</property>

メモリ管理

ユーザーと管理者は mapred.child.ulimit を使うことで、起動された子タスク、および子タスクから再帰的に生成されるあらゆるサブプロセスの仮想メモリの上限を指定することもできます。この方法で設定する値は、プロセスごとの制限となる点に注意してください。mapred.child.ulimit の値は、キロバイト (KB) 単位で指定する必要があります。また、指定する値は、JavaVM に渡された -Xmx と等しいかそれ以上でなければなりません。そうでない場合、VM が起動しなくなるおそれがあります。

注: mapred.child.java.opts が使われるのは、TaskTracker から起動された子タスクの設定に対してだけです。デーモンのメモリオプションの設定については、クラスタセットアップで説明しています。

Map/Reduce フレームワークのいくつかの部分についても、利用可能なメモリを設定することができます。map/reduce タスクでは、処理の並列性およびディスクへのデータ書き込み頻度に影響するパラメータを調整すると、パフォーマンスが変わる可能性があります。これらのパラメータを調整する際には、ジョブのファイルシステム・カウンタ、とりわけ map からのバイトカウントと reduce へのバイトカウントに関係するカウンタを監視すると大いに役立ちます。

メモリ管理が有効になっている場合、ユーザーは、TaskTracker によって設定された仮想メモリと RAM のデフォルトの上限を上書きすることができます。ユーザーは以下のパラメータをジョブごとに設定できます。

名前説明
mapred.task.maxvmemint ジョブの各タスクに対する最大仮想メモリのタスク制限を示す値をバイト単位で指定します。指定された値を超える仮想メモリを消費したタスクは kill されます。
mapred.task.maxpmemint ジョブの各タスクに対する RAM のタスク制限を示す値をバイト単位で指定します。必要な RAM の量に基づいてノード上でのタスクのオーバースケジューリングを防ぐために、スケジューラはここで指定された値を使うことができます。

Map のパラメータ

map から出力されたレコードは直列化してバッファに入れられ、メタデータはアカウンティングバッファに格納されます。以下に示すオプションで説明されているように、直列化バッファまたはメタデータがしきい値を超えると、map がレコードを出力し続ける一方で、バッファの内容はバックグラウンドでソートされてディスクに書き込まれます。バッファからの吐き出しが進行中にいずれかのバッファが完全にいっぱいになると、map スレッドはブロックされます。map が終了すると、残っているレコードはディスクに書き込まれ、すべてのオンディスクセグメントは単一のファイルにマージされます。ディスクへの吐き出し回数を最小限に抑えると map 時間を短縮できますが、より大きなバッファを用意すれば mapper が利用可能なメモリが減ります。

名前説明
io.sort.mbint map から出力されたレコードを格納する直列化バッファとアカウンティングバッファの合計サイズをメガバイト単位で指定します。
io.sort.record.percentfloat 直列化バッファとアカウンティングスペースの比率は調整可能です。各直列化レコードは、直列化されたサイズに加え、ソートを実行するために 16 バイトのアカウンティング情報を必要とします。io.sort.mb から割り当てられるこのスペースの割合は、直列化バッファとアカウンティングスペースのいずれかを使い切ったときに起こるディスクへの吐き出しの可能性を左右します。言うまでもなく、小さなレコードを出力する map では、デフォルト値より高い値を指定すれば、ディスクへの吐き出し回数は減ります。
io.sort.spill.percentfloat アカウンティングバッファと直列化バッファのしきい値を指定します。いずれかのバッファが指定された割合まで満たされると、バッファの内容はバックグラウンドでディスクに吐き出されます。ここで、io.sort.record.percentr とし、io.sort.mbx、しきい値を q とします。この場合、回収スレッドが吐き出しを行うまでに回収されるレコードの最大数は、r * x * q * 2^16 になります。高い値を指定すればマージの回数を減らしたり、マージを不要にしたりできますが、その場合は map タスクがブロックされる可能性も高くなります。通常、平均 map 時間を最も短くするには、map 出力のサイズを正確に見積もって複数の吐き出しが行われないようにします。

その他の注意事項

  • 吐き出しの進行中に、吐き出しのしきい値のいずれかを超えると、吐き出しが終わるまで回収が続けられます。たとえば、io.sort.buffer.spill.percent の値が 0.33 に設定されていて、吐き出しの実行中にバッファの残りがいっぱいに満たされた場合、次の吐き出しでは回収されたレコードのすべて、すなわちバッファの 0.66 が対象となり、それ以上の吐き出しは行われません。別の言い方をすれば、しきい値はトリガを定めるもので、処理をブロックする性質のものではありません。
  • 直列化バッファより大きいレコードは、最初に吐き出しをトリガし、次に独立したファイルに吐き出されます。このレコードが最初に combiner を通るかどうかは不定です。

Shuffle/Reduce のパラメータ

すでに説明したように、各 reduce は Partitioner によって割り当てられた出力を HTTP 経由でメモリに取り込み、これらの出力を一定期間ごとにディスクにマージします。map 出力の中間圧縮が有効になっている場合、各出力は圧縮を解除されてメモリに入れられます。以下に示すオプションは、reduce の前に行われるディスクへのこれらのマージの頻度と、reduce の際に map 出力に割り当てられるメモリに影響を与えます。

名前説明
io.sort.factorint 同時にディスクにマージするセグメント数を指定します。この値は、マージ中のオープンファイル数と圧縮 codec を制限します。ファイル数がこの制限を超えると、マージは複数のパスに分けて実行されます。この制限は map にも適用されますが、ほとんどのジョブ設定では、この制限に達することはまずありません。
mapred.inmem.merge.thresholdint ディスクにマージする前にメモリに取り込むソート済み map 出力数を指定します。上の注意事項で述べた吐き出しのしきい値と同様、この値はパーティションの単位ではなく、トリガを定めるものです。実際には、一般に非常に高い値 (1000) を設定するか、無効 (0) にするかのどちらかです。これは、インメモリセグメントのマージより、ディスクからマージする方が高価だからです (表の後の注意事項を参照)。指定したしきい値は、shuffle 中のインメモリマージの頻度だけに適用されます。
mapred.job.shuffle.merge.percentfloat インメモリマージを開始する前に取り込む map 出力のためのメモリのしきい値を、map 出力をメモリに格納するために割り当てられたメモリに占める割合によって指定します。メモリに収まりきらない map 出力は止まってしまう可能性があるので、値を高く設定すると、メモリへの取り込みとマージまでの並列処理が減る可能性があります。逆に、reduce への入力が完全にメモリに入りきる場合には、1.0 まで高くすると、reduce に効果があります。指定したパラメータは、shuffle 中のインメモリマージの頻度だけに適用されます。
mapred.job.shuffle.input.buffer.percentfloat shuffle 中に map 出力を格納するために割り当てることができるメモリの割合を、(一般に mapred.child.java.opts で指定される) ヒープサイズの上限を基準に指定します。一部のメモリは、Map/Reduce フレームワークのために予約されますが、通常は、多数の大きな map 出力を格納できるよう、十分高い値を設定すると効果的です。
mapred.job.reduce.input.buffer.percentfloat reduce 中に map 出力を保持しておくことができるメモリの割合をヒープサイズの上限を基準に指定します。reduce が始まると、残っている map 出力がここで指定されたリソース制限を下回るまで、map 出力がディスクにマージされます。デフォルトでは、reduce で利用可能なメモリを最大化するために、reduce が始まる前にすべての map 出力がディスクにマージされます。メモリをそれほど使わない reduce に対しては、ディスクへの書き込みを減らすためにこの値を増やすとよいでしょう。

その他の注意事項

  • map 出力が、map 出力をコピーするために割り当てられたメモリの 25% を超える場合、該当する map 出力はメモリを経由せずに直接ディスクに書き込まれます。
  • combiner を使用する場合、merge のしきい値を高くしてバッファを大きく取る方法は使用できない可能性があります。すべての map 出力を取り込む前に開始される merge では、ディスクへの吐き出しが行われるのと同時に combiner が実行されます。一部のケースでは、バッファサイズを可能な限り増やすよりも、map 出力を結合するためにリソースを割く方が (ディスクへの吐き出しを少なくして、吐き出しと取り込みを並列処理できるので)、reduce 時間を短縮できます。
  • reduce を始めるためにインメモリ map 出力をディスクにマージする際、ディスクに吐き出す必要のあるセグメントがあって、少なくとも io.sort.factor 個のセグメントがすでにディスク上に吐き出されているために中間 merge が必要な場合、インメモリ map 出力は中間 merge の対象に含まれることになります。

ディレクトリ構造

TaskTracker では、ローカライズされたキャッシュとローカライズされたジョブを作成するためのローカルディレクトリ ${mapred.local.dir}/taskTracker/ が用意されます。TaskTracker では、(複数のディスクにまたがって) 複数のローカルディレクトリを定義することができ、各ファイル名が半ば無作為なローカルディレクトリに割り当てられます。 ジョブがスタートすると、TaskTracker は、設定で指定されたローカルディレクトリからの相対パスで、ローカライズされたジョブディレクトリを作成します。したがってこのとき、TaskTracker のディレクトリ構造は以下のようになります。

  • ${mapred.local.dir}/taskTracker/archive/ : 分散キャッシュ。このディレクトリは、ローカライズされた分散キャッシュを保持します。したがって、ローカライズされた分散キャッシュはすべてのタスクとジョブで共有されます。
  • ${mapred.local.dir}/taskTracker/jobcache/$jobid/ : ローカライズされたジョブのディレクトリ。
    • ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/ : ジョブ固有の共有ディレクトリ。複数のタスクがこのスペースをスクラッチスペースとして使うことができ、タスク間でファイルを共有することができます。このディレクトリは、設定プロパティ job.local.dir を介してユーザーに公開されます。このディレクトリは、API JobConf.getJobLocalDir() を介してアクセスできます。System プロパティとしても利用できます。したがって、ユーザー (ストリーミングなど) は、System.getProperty("job.local.dir") を呼び出して、このディレクトリにアクセスすることができます。
    • ${mapred.local.dir}/taskTracker/jobcache/$jobid/jars/ : ジョブの jar ファイルと展開された jar を置くための jars ディレクトリです。job.jar は、各マシンに自動的に配布されるアプリケーションの jar ファイルです。jar ファイルは、ジョブのタスクがスタートする前に jars ディレクトリに展開されます。アプリケーションからは、API JobConf.getJar() を介して job.jar の場所にアクセスできます。展開 (unjar) されたディレクトリにアクセスするには、JobConf.getJar().getParent() を呼び出します。
    • ${mapred.local.dir}/taskTracker/jobcache/$jobid/job.xml : job.xml ファイルは、ジョブ用にローカライズされた汎用ジョブ設定です。
    • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid : 各タスク試行用のタスクディレクトリ。各タスクディレクトリは、以下のような構造になっています。
      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/job.xml : job.xml ファイルは、タスクのローカライズされたジョブ設定です。タスクのローカライズとは、ジョブ内の該当する特定のタスクに、固有のプロパティが設定されていることをいいます。各タスク用にローカライズされるプロパティについては、以下で説明します。
      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/output : 中間出力ファイル用ディレクトリ。map 出力ファイルなど、Map/Reduce フレームワークによって生成される一時的な map/reduce データが置かれます。
      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work : タスクの現在の作業ディレクトリ。タスクで jvm の再利用が有効になっている場合、このディレクトリは、jvm が起動したディレクトリとなります。
      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work/tmp : タスクの一時ディレクトリ。(ユーザーは、プロパティ mapred.child.tmp を指定することで、map タスクと reduce タスク用の一時ディレクトリの値を設定することができます。デフォルトの値は ./tmp です。値が絶対パスでない場合は、値の先頭にタスクの作業ディレクトリが追加されます。それ以外の場合は、指定された値が直接割り当てられます。ディレクトリが存在しない場合は作成されます。次に、子 java タスクは、オプション -Djava.io.tmpdir='一時ディレクトリの絶対パス' を付けて実行されます。また、パイプとストリーミングでは、環境変数 TMPDIR='一時ディレクトリの絶対パス' が設定されます。) このディレクトリは、mapred.child.tmp の値が ./tmp である場合に作成されます。

タスク JVM の再利用

ジョブは、ジョブ設定 mapred.job.reuse.jvm.num.tasks を指定することで、タスク JVM の再利用を有効にすることができます。値が 1 (デフォルト) の場合、JVM は再利用されません (すなわち、JVM 1 つに 1 タスクとなります)。値が -1 である場合、JVM が実行できる (同一ジョブの) タスク数に制限はありません。API JobConf.setNumTasksToExecutePerJvm(int) を使って、1 より大きい値を設定することもできます。

以下に示すプロパティは、各タスクの実行用のジョブ設定でローカライズされます。

名前説明
mapred.job.idStringジョブ ID
mapred.jarString ジョブディレクトリ内の job.jar の場所
job.local.dir String ジョブ固有の共有スクラッチスペース
mapred.tip.id String タスク ID
mapred.task.id String タスク試行 ID
mapred.task.is.map boolean map タスクかどうか
mapred.task.partition int ジョブ内のタスク ID
map.input.file String map が読み取り中のファイル名
map.input.start long map 入力分割単位の開始オフセット
map.input.length long map 入力分割単位のバイト数
mapred.work.output.dir String タスクの一時出力ディレクトリ

タスクの標準出力 (stdout) と標準エラー (stderr) ストリームは、TaskTracker によって読み取られ、${HADOOP_LOG_DIR}/userlogs にログとして記録されます。

jar とネイティブライブラリの両方を配布して map タスクや reduce タスクで使用できるようにするために、DistributedCache を使うこともできます。子 jvm は、常に自身の現在の作業ディレクトリjava.library.pathLD_LIBRARY_PATH に追加します。したがって、キャッシュされたライブラリは、System.loadLibrary または System.load を介してロードすることができます。分散キャッシュを通じて共有ライブラリをロードする方法の詳細については、ネイティブライブラリを参照してください。

ジョブのサブミットと監視

JobClient は、ユーザージョブが JobTracker と対話するための最も重要なインタフェースです。

JobClient は、ジョブのサブミット、サブミットしたジョブの進捗の追跡、コンポーネントタスクの報告とログへのアクセス、Map/Reduce クラスタのステータス情報の取得などを行うための機能を提供します。

ジョブのサブミットのプロセスは次のとおりです。

  1. ジョブの入力仕様と出力仕様をチェックします。
  2. ジョブの InputSplit の値を計算します。
  3. 必要なら、ジョブの DistributedCache に必要となるアカウンティング情報をセットアップします。
  4. ジョブの jar と設定を FileSystem 上の Map/Reduce システムディレクトリにコピーします。
  5. JobTracker にジョブをサブミットし、場合によってはジョブのステータスを監視します。

ジョブの履歴ファイルは、ユーザー指定のディレクトリ hadoop.job.history.user.location (デフォルトはジョブの出力ディレクトリ) にも記録されます。これらのファイルは、指定されたディレクトリ内の "_logs/history/" に格納されます。したがって、デフォルトではこれらのファイルは mapred.output.dir/_logs/history に置かれることになります。ユーザーは、hadoop.job.history.user.location に値 none を指定することで、ログ記録を停止できます。

ユーザーは、次のコマンドを実行することで、指定されたディレクトリにある履歴ログのサマリーを表示することができます。
$ bin/hadoop job -history output-dir
コマンドを実行すると、ジョブの詳細、失敗した TIP (進行中タスク) および kill された TIP の詳細が表示されます。
次のコマンドを実行すると、成功したタスクや各タスクのタスク試行など、さらに詳しい情報を表示することができます。
$ bin/hadoop job -history all output-dir

ユーザーは、OutputLogFilter を使うことで、出力ディレクトリのリストからログファイルをフィルターアウトできます。

通常、ユーザーはアプリケーションを作成し、JobConf を介してジョブのさまざまな側面について記述し、さらに JobClient を使ってジョブをサブミットしてその進捗を監視します。

Job のコントロール

単独の Map/Reduce ジョブでは実行できない複雑なタスクを遂行するために、複数の Map/Reduce ジョブのチェイン化が必要になる場合があります。しかし、これは簡単に実行できます。ジョブの出力は分散ファイルシステムに送られ、今度はこの出力を次のジョブの入力として利用できるからです。

ただし、(成功であれ失敗であれ) ジョブの完了を確実にするための負担を、クライアントが直接かぶることになります。このような場合に使用できるジョブ・コントロールオプションとして、以下のものがあります。

  • runJob(JobConf) : ジョブをサブミットし、ジョブが完了するまでは戻りません。
  • submitJob(JobConf) : ジョブのサブミットだけを行い、返された RunningJob へのハンドルをポーリングして、ステータスを問い合わせ、スケジューリングに関する決定を下します。
  • JobConf.setJobEndNotificationURI(String) : ポーリングを行わずに、ジョブ完了時に通知が送られるようにします。

Job の入力

InputFormat では、Map/Reduce ジョブの入力仕様を記述します。

Map/Reduce フレームワークはジョブの InputFormat に基づいて以下のことを行います。

  1. ジョブの入力仕様の有効性を確認します。
  2. 1 つまたは複数の入力ファイルを論理 InputSplit インスタンスに分割し、ついで各インスタンスに 1 つの Mapper を割り当てます。
  3. 論理 InputSplit から入力レコードを拾って Mapper に処理させるために使う RecordReader の実装を提供します。

通常は FileInputFormat のサブクラスとなるファイルベースの InputFormat の実装のデフォルト動作は、入力ファイルのバイト単位での合計サイズに基づいて、入力を論理 InputSplit インスタンスに分割することです。ただし、入力ファイルの FileSystem のブロックサイズが、入力分割単位の上限として扱われます。分割単位サイズの下限は、mapred.min.split.size を介して設定することができます。

入力サイズに基づく論理分割単位は、多くのアプリケーションにとって不十分であることは明らかです。なぜならレコードの境界を尊重する必要があるからです。こうしたケースでは、レコードの境界を尊重し、論理 InputSplit のレコード指向ビューを個々のタスクに提示するような RecordReader を、アプリケーション側で実装する必要があります。

デフォルトの InputFormat は、TextInputFormat です。

TextInputFormat が、ある与えられたジョブの InputFormat である場合、Map/Reduce フレームワークは .gz 拡張子を持つ入力ファイルを検出し、適切な CompressionCodec を使って入力ファイルを自動的に展開します。ただし、上の拡張子を持つ圧縮ファイルは分割できないこと、各圧縮ファイルはファイル全体が 1 つの Mapper で処理される点に注意する必要があります。

InputSplit

InputSplit は、個々の Mapper が処理するデータを表します。

通常、InputSplit は、入力のバイト指向ビューを提示し、レコード指向ビューを処理、提示するのは RecordReader の役割になります。

デフォルトの InputSplit は、FileSplit です。FileSplitは、論理分割する入力ファイルのパスを map.input.file に設定します。

RecordReader

RecordReader は、InputSplit から <key, value> ペアを読み取ります。

一般に、RecordReader は、InputSplit から渡された、入力のバイト指向ビューを変換し、処理を行わせる Mapper の実装にレコード指向ビューを提示します。したがって、RecordReader はレコードの境界を処理する役割を引き受け、キーとバリューをタスクに提示します。

Job の出力

OutputFormat では、Map/Reduce ジョブの出力仕様を記述します。

Map/Reduce フレームワークはジョブの OutputFormat に基づいて以下のことを行います。

  1. ジョブの出力仕様の有効性を確認します。たとえば、出力ディレクトリがすでに存在するかどうかをチェックします。
  2. ジョブの出力ファイルを書き込むのに使う RecordWriter を提供します。出力ファイルは、FileSystem に格納されます。

デフォルトの OutputFormat は、TextOutputFormat です。

OutputCommitter

OutputCommitter では、Map/Reduce ジョブのタスク出力のコミットについて記述します。

Map/Reduce フレームワークはジョブの OutputCommitter に基づいて以下のことを行います。

  1. 初期化中にジョブのセットアップを行います。たとえば、ジョブの初期化中にジョブ用の一時出力ディレクトリを作成します。ジョブのセットアップは、ジョブが PREP 状態のとき、およびタスクの初期化後に、独立したタスクによって実行されます。セットアップタスクが完了すると、ジョブは RUNNING 状態に移行します。
  2. ジョブの完了後にジョブのクリーンアップを行います。たとえば、ジョブの完了後に一時出力ディレクトリを削除します。ジョブのクリーンアップは、ジョブの最後に、独立したタスクによって実行されます。クリーンアップタスクが完了すると、ジョブは成功 (SUCCEDED)、失敗 (FAILED)、kill された(KILLED) のいずれかに分類されます。
  3. タスクの一時出力をセットアップします。タスクのセットアップは、タスクの初期化中に、同一タスクの一部として実行されます。
  4. タスクがコミットを必要とするかどうかチェックします。これはタスクがコミットを必要としない場合に、コミットの手順を行わないようにするためです。
  5. タスクの出力をコミットします。タスクが終わると、タスクは必要に応じて出力をコミットします。
  6. タスクのコミットを破棄します。タスクが失敗したり kill された場合、出力はクリーンアップされます。タスクが (exception ブロックに入って) クリーンアップを行えない場合には、クリーンアップを実行するために、同一の試行 ID で別のタスクが生成されます。

デフォルトの OutputCommitter は、FileOutputCommitter です。ジョブのセットアップタスクとクリーンアップタスクは、map スロットまたは reduce スロットのうち、TaskTracker 上で空いている方を使用します。また、JobCleanup タスク、TaskCleanup タスク、および JobSetup タスクには、この順に最も高いプライオリティが与えられます。

タスクの副次的ファイル

一部のアプリケーションでは、コンポーネントタスクが、実際のジョブ出力ファイルとは異なる副次的ファイルを作成したり、これらのファイルに書き込んだりする必要が生じる場合があります。

こうしたケースでは、同一の Mapper または Reducer の 2 つのインスタンスが同時に実行され (たとえば投機的タスク)、FileSystem 上の同一のファイル (パス) に対するオープンや書き込みを試みるという問題が生じるおそれがあります。したがって、アプリケーションの作成者は、タスクごとではなく、(attempt_200709221812_0001_m_000000_0 のような試行 ID を使って) タスク試行ごとに一意の名前が生成されるようにする必要があります。

このような問題を避けるため、Map/Reduce フレームワークは、OutputCommitterFileOutputCommitter である場合には、各タスク試行に対し、該当するタスク試行の出力が格納される FileSystem 上に、${mapred.work.output.dir} を介してアクセスできる特別な ${mapred.output.dir}/_temporary/_${taskid} サブディレクトリを用意します。タスク試行が成功状態で完了すると、${mapred.output.dir}/_temporary/_${taskid} にあるファイル (のみ) が、${mapred.output.dir} への昇格を認められます。言うまでもありませんが、Map/Reduce フレームワークは、不成功だったタスク試行のサブディレクトリを破棄します。このプロセスは、アプリケーションからは完全に透過的に行われます。

アプリケーション作成者はこの機能を利用し、FileOutputFormat.getWorkOutputPath() を介して、タスクの実行中に必要な任意の副次的ファイルを ${mapred.work.output.dir} 内に作成することができます。Map/Reduce フレームワークは、これらのファイルに対し、成功したタスク試行の場合と同様に、昇格を認め、タスク試行ごとに一意のパスを生成する作業を不要にしています。

注: 特定のタスク試行の実行中の ${mapred.work.output.dir} の値は、具体的には ${mapred.output.dir}/_temporary/_{$taskid} となり、この値は Map/Reduce フレームワークによって設定されます。したがって、map/reduce タスクからは、FileOutputFormat.getWorkOutputPath() によって返されたパス内に任意の副次的ファイルを作成するだけで、Map/Reduce フレームワークが用意しているこの機能を利用することができます。

上に説明した内容は、reducer=NONE (すなわち reduce なし) のジョブの map に対しても、そのまま適用されます。このようなジョブでは、map の出力は直接 HDFS に送られるからです。

RecordWriter

RecordWriter は、出力された <key, value> ペアを出力ファイルに書き込みます。

RecordWriter の実装は、ジョブの出力を FileSystem に書き込みます。

その他の便利な機能

キューへのジョブのサブミット

ユーザーはジョブをキューにサブミットします。キューはジョブの集合で、システムはキューを使って特定の機能を提供できます。たとえば、キューは ACL を使って、どのユーザーがジョブをキューにサブミットできるかをコントロールします。キューは主に Hadoop の Scheduler によって使われることを想定しています。

Hadoop では、'default' と呼ばれる 1 つの必須キューがあらかじめ設定されています。キュー名は、Hadoop サイト設定の mapred.queue.names プロパティで定義されます。Capacity Scheduler など、一部のジョブスケジューラは複数のキューをサポートしています。

ジョブは、サブミット先として必要なキューを、mapred.job.queue.name を介して、または setQueueName(String) API を介して定義します。キュー名の設定は省略可能です。関連付けられたキュー名なしにジョブがサブミットされた場合、そのジョブは 'default' キューにサブミットされます。

Counters

Counters は、グローバルなカウンタを表し、Map/Reduce フレームワークまたはアプリケーションによって定義されます。各 Counter には、任意の Enum 型を使用できます。特定の Enum 型の複数のカウンターは、Counters.Group 型のグループにまとめられます。

アプリケーションは (Enum 型の) 任意の Counters を定義でき、定義した Counters は、map メソッドや reduce メソッドの中で、Reporter.incrCounter(Enum, long) または Reporter.incrCounter(String, String, long) を介して更新することができます。これらのカウンタは、Map/Reduce フレームワークによってグローバルに集約されます。

DistributedCache

DistributedCache は、アプリケーション固有の大きな読み取り専用ファイルを効率的に配布します。

DistributedCache は、アプリケーションが必要とするファイル (テキスト、アーカイブ、jar など) をキャッシュするために Map/Reduce フレームワークが用意している機能です。

アプリケーションは、JobConf を使って、キャッシュするファイルの URL (hdfs://) を指定します。DistributedCache は、hdfs:// URL で指定されたファイルが FileSystem 上にすでに存在するものと仮定します。

Map/Reduce フレームワークは、ジョブのいずれかのタスクがスレーブノード上で実行される前に、必要なファイルをそのスレーブノードにコピーします。DistributedCache の効率性は、ファイルをジョブごとに 1 度だけコピーする点と、スレーブ上ではアーカイブを展開した形でアーカイブをキャッシュできる点に由来します。

DistributedCache は、キャッシュされたファイルの修正日時を追跡します。言うまでもありませんが、ジョブの実行中はアプリケーションや外部からキャッシュファイルを変更してはいけません。

DistributedCache は、単純な読み取り専用のデータやテキストファイル、これらより複雑なアーカイブや jar などを配布するのに使用できます。アーカイブ (zip、tar、tgz、および tar.gz ファイル)は、スレーブノード上ではアーカイブを展開されます。ファイルには実行パーミッションが設定されます。

ファイルやアーカイブを配布するには、mapred.cache.{files|archives} プロパティを設定します。配布するファイルやアーカイブが複数ある場合は、カンマで区切ってパスを追加します。mapred.cache.{files|archives} プロパティは、API DistributedCache.addCacheFile(URI,conf)/ DistributedCache.addCacheArchive(URI,conf) および DistributedCache.setCacheFiles(URIs,conf)/ DistributedCache.setCacheArchives(URIs,conf) を介して設定することもできます。URI は hdfs://host:port/absolute-path#link-name 形式で指定します。Streaming では、コマンドラインオプション -cacheFile/-cacheArchive でファイルを配布できます。

また、ユーザーは、DistributedCache.createSymlink(Configuration) API を介して、DistributedCache に対し、キャッシュされたファイルへのシンボリックリンクをタスクの現在の作業ディレクトリに作成するよう指示することもできます。また、そのための方法として、設定プロパティ mapred.create.symlinkyes を指定するやり方もあります。DistributedCache は、URI の フラグメント をシンボリックリンクの名前として使います。たとえば、URI hdfs://namenode:port/lib.so.1#lib.so のシンボリックリンク名は、分散キャッシュ内のファイル が lib.so.1 であるのに対し、タスクの現在の作業ディレクトリでは lib.so になります。

DistributedCache は、map タスクや reduce タスクの中で、荒削りなソフトウェア配布機構として使用することもできます。DistributedCache は、jar とネイティブライブラリの両方を配布するのに使用できます。DistributedCache.addArchiveToClassPath(Path, Configuration) または DistributedCache.addFileToClassPath(Path, Configuration) API を使うと、ファイルや jar をキャッシュした上で、これらのファイルや jar を子 jvm の classpath に追加できます。別の方法として、設定プロパティ mapred.job.classpath.{files|archives} を設定するやり方もあります。同様に、タスクの現在の作業ディレクトリにキャッシュされたファイルのシンボリックリンクを作成すれば、ネイティブライブラリを配布して、これらのライブラリをロードするのに使用できます。

Tool

Tool インタフェースは、汎用 Hadoop コマンドラインオプションの処理をサポートします。

Tool は、あらゆる Map/Reduce ツールやアプリケーションの標準です。アプリケーションは、標準コマンドラインオプションの処理については、ToolRunner.run(Tool, String[]) を介して GenericOptionsParser に任せ、独自の引数だけを処理するようにする必要があります。

汎用 Hadoop コマンドラインオプションは以下のとおりです。
-conf <設定ファイル>
-D <プロパティ=値>
-fs <local|namenode:port>
-jt <local|jobtracker:port>

IsolationRunner

IsolationRunner は、Map/Reduce プログラムのデバッグを支援するユーティリティです。

IsolationRunner を利用するには、まず最初に keep.failed.tasks.filestrue を設定します (keep.tasks.files.pattern も参照してください)。

次に、失敗したタスクが実行されたノードへ行って、TaskTracker のローカルディレクトリに入り、次の要領で IsolationRunner を実行します。
$ cd <local path>/taskTracker/${taskid}/work
$ bin/hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml

IsolationRunner は、デバッガの中で実行可能な単独の jvm 内で、正確に同一の入力を使って、失敗したタスクを実行します。

プロファイル

プロファイルは、map と reduce のサンプル用に、ビルトイン java プロファイラの代表的な (2 つないし 3 つの) サンプルを取得する機能です。

ユーザーは、設定プロパティ mapred.task.profile を指定することで、ジョブの一部のタスクについてシステムがプロファイラ情報を収集するかどうかを指定できます。プロパティの値は、API JobConf.setProfileEnabled(boolean) を介して設定できます。プロパティの値が true に設定されている場合、タスクのプロファイルは有効になります。プロファイラ情報はユーザーのログディレクトリに格納されます。デフォルトでは、ジョブのプロファイルは無効になっています。

上の説明にしたがってプロファイルを有効にしたら、ユーザーは設定プロパティ mapred.task.profile.{maps|reduces} を使って、プロファイルの対象とする map/reduce タスクの範囲を設定できます。プロパティの値は、API JobConf.setProfileTaskRange(boolean,String) を介して設定できます。デフォルトでは、指定範囲は 0-2 になっています。

ユーザーは、設定プロパティ mapred.task.profile.params を設定することで、プロファイラ設定引数を指定することもできます。プロパティの値は、API JobConf.setProfileParams(String) を介して設定できます。指定された文字列に %s が含まれている場合は、タスク実行時のプロファイル出力ファイル名で置き換えられます。これらのパラメータは、コマンドラインでタスクの子 JVM に渡されます。プロファイルパラメータのデフォルトの値は、-agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s です。

デバッグ

Map/Reduce フレームワークは、デバッグ用のユーザー指定スクリプトを実行できる機能を提供しています。map/reduce タスクが失敗した場合、たとえばタスクログを処理するなどの目的で、ユーザーはデバッグスクリプトを実行することができます。スクリプトには、タスクの標準出力と標準エラー出力、syslog、および jobconf にアクセスする権限が与えられます。デバッグスクリプトの標準出力と標準エラー出力からの出力は、コンソール診断情報に表示され、ジョブ UI の一部としても表示されます。

以下では、ジョブとともにデバッグスクリプトをサブミットする方法について説明します。スクリプトファイルは配布し、Map/Reduce フレームワークにサブミットする必要があります。

スクリプトファイルの配布方法

ユーザーは、DistributedCache を使ってスクリプトファイルを配布し、スクリプトファイルへのシンボリックリンクを作成する必要があります。

スクリプトのサブミット方法

デバッグスクリプトをサブミットするには、それぞれ map タスクと reduce タスクをデバッグするためのプロパティである mapred.map.task.debug.script および mapred.reduce.task.debug.script に値を設定するのが簡単な方法です。これらのプロパティは、API JobConf.setMapDebugScript(String) および JobConf.setReduceDebugScript(String) を使って設定することもできます。Streaming モードでは、それぞれ map タスクと reduce タスクをデバッグするためのコマンドラインオプションである -mapdebug および -reducedebug を使ってデバッグスクリプトをサブミットできます。

スクリプトの引数は、タスクの標準出力、標準エラー出力、syslog ファイル、および jobconf ファイルです。map/reduce タスクが失敗したノードで実行されるデバッグコマンドは次のとおりです。
$script $stdout $stderr $syslog $jobconf

パイププログラムでは、コマンドの 5 番目の引数として C++ プログラム名が指定されます。したがって、パイププログラムでは、コマンドは次のようになります。
$script $stdout $stderr $syslog $jobconf $program

デフォルトの動作:

パイプでは、デフォルトスクリプトが実行され、gdb でコアダンプが処理され、スタックトレースが表示され、実行中のスレッドについての情報が表示されます。

JobControl

JobControl は、Map/Reduce ジョブのセットおよびこれらのジョブの依存関係をカプセル化するユーティリティです。

データの圧縮

Hadoop Map/Reduce は、アプリケーション作成者が中間 map 出力とジョブ出力 (すなわち reduce 出力) の両方を対象に圧縮を指定することができる機能を提供しています。また、Hadoop Map/Reduce には、zlib 圧縮アルゴリズムの CompressionCodec の実装も含まれています。gzip ファイル形式もサポートされています。

Hadoop では、パフォーマンス (zlib) と Java ライブラリ不在の両方を理由に、上に取り上げた圧縮 codec のネイティブ実装も用意されています。これらの使い方と利用可能な機能の詳細については、こちらを参照してください。

中間出力

アプリケーションでは、JobConf.setCompressMapOutput(boolean) API および CompressionCodec to be used via the JobConf.setMapOutputCompressorClass(Class) API を介して、中間 map 出力の圧縮をコントロールすることができます。

ジョブの出力

アプリケーションでは、FileOutputFormat.setCompressOutput(JobConf, boolean) API を介してジョブ出力の圧縮をコントロールすることができ、使用する CompressionCodec は、FileOutputFormat.setOutputCompressorClass(JobConf, Class) API を介して指定できます。

ジョブの出力が SequenceFileOutputFormat で格納される場合には、必要な SequenceFile.CompressionType (すなわち RECORD または BLOCK。デフォルトは RECORD) は、SequenceFileOutputFormat.setOutputCompressionType(JobConf, SequenceFile.CompressionType) API を介して指定できます。

不良レコードのスキップ

Hadoop では、map 入力を処理する際、一定数の不良な入力レコードをスキップするオプションを用意しています。アプリケーションでは、SkipBadRecords クラスを介してこの機能をコントロールすることができます。

この機能は、map タスクが特定の入力で必ずクラッシュする場合に使用できます。通常、このようなクラッシュは map 関数のバグが原因です。一般的には、ユーザーがこれらのバグを修正することになるでしょう。ただし、そうした作業が不可能なこともあります。たとえば、バグがサードパーティ製ライブラリに起因するもので、このライブラリのソースコードが開示されていない場合などです。こうしたケースでは、何度試行してもタスクは成功せず、ジョブは失敗します。不良レコードをスキップする機能を使えば、不良レコード前後のデータが一部失われますが、アプリケーションによってはそれでも許容可能な場合があります (非常に膨大なデータを対象に統計的分析を行うアプリケーションなど)。

デフォルトでは、この機能は無効になっています。この機能を有効にするには、SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) および SkipBadRecords.setReducerMaxSkipGroups(Configuration, long) を参照してください。

この機能が有効になっている場合、Map/Reduce フレームワークは、一定数の map が失敗した後に「スキップモード」に入ります。詳細については、SkipBadRecords.setAttemptsToStartSkipping(Configuration, int) を参照してください。「スキップモード」では、map タスクは処理するレコードの範囲を保持します。そのために、Map/Reduce フレームワークは、処理されたレコードのカウンタを利用します。詳細については、SkipBadRecords.COUNTER_MAP_PROCESSED_RECORDS および SkipBadRecords.COUNTER_REDUCE_PROCESSED_GROUPS を参照してください。このカウンタにより、Map/Reduce フレームワークは、処理に成功したレコードがいくつかを知ることができ、したがってタスクをクラッシュさせたレコードの範囲を把握することができます。以降の試行では、該当する範囲のレコードはスキップされます。

スキップされるレコードの数は、処理されたレコードのカウンタがアプリケーションによってどれくらい頻繁にインクリメントされるかに左右されます。このカウンタは、レコードを 1 つ処理するたびにインクリメントすることを推奨します。ただし、主にバッチ処理を行う一部のアプリケーションでは、そうした操作が不可能なことがあります。このような場合、Map/Reduce フレームワークは、不良レコード前後のレコードも併せてスキップしてしまう可能性があります。ユーザーは、SkipBadRecords.setMapperMaxSkipRecords(Configuration, long) および SkipBadRecords.setReducerMaxSkipGroups(Configuration, long) を介して、スキップされるレコードの数をコントロールすることができます。Map/Reduce フレームワークは、バイナリサーチに似たアプローチを使って、スキップするレコードの範囲を狭めようとします。具体的には、スキップされた範囲は半分に分割され、一方だけが実行されます。次の失敗で、Map/Reduce フレームワークは、範囲の半分のうちどちらに不良レコードが含まれているか判断します。この要領で同じ操作が繰り返され、許容可能なスキップ値が満たされるか、またはすべてのタスク試行を実行し終えるかのいずれかになるまで、タスクは再実行されます。タスク試行の回数を増やすには、JobConf.setMaxMapAttempts(int) および JobConf.setMaxReduceAttempts(int) を使ってください。

スキップされたレコードは、あとで分析できるよう、シーケンスファイル形式で HDFS に書き込まれます。書き込み先は、SkipBadRecords.setSkipOutputPath(JobConf, Path) を介して変更できます。

例: WordCount v2.0

ここでは、これまでに説明した Map/Reduce フレームワークが提供するさまざまな機能を利用した WordCount アプリケーションの改良版を示します。

アプリケーションを実行するには、特に DistributedCache 関連の機能を利用するために、HDFS が起動して実行されている必要があります。したがって、このアプリケーションは、疑似分散モードまたは完全分散モードの Hadoop でのみ動作します。

ソースコード

WordCount.java
1. package org.myorg;
2.
3. import java.io.*;
4. import java.util.*;
5.
6. import org.apache.hadoop.fs.Path;
7. import org.apache.hadoop.filecache.DistributedCache;
8. import org.apache.hadoop.conf.*;
9. import org.apache.hadoop.io.*;
10. import org.apache.hadoop.mapred.*;
11. import org.apache.hadoop.util.*;
12.
13. public class WordCount extends Configured implements Tool {
14.
15.    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
16.
17.      static enum Counters { INPUT_WORDS }
18.
19.      private final static IntWritable one = new IntWritable(1);
20.      private Text word = new Text();
21.
22.      private boolean caseSensitive = true;
23.      private Set<String> patternsToSkip = new HashSet<String>();
24.
25.      private long numRecords = 0;
26.      private String inputFile;
27.
28.      public void configure(JobConf job) {
29.        caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
30.        inputFile = job.get("map.input.file");
31.
32.        if (job.getBoolean("wordcount.skip.patterns", false)) {
33.          Path[] patternsFiles = new Path[0];
34.          try {
35.            patternsFiles = DistributedCache.getLocalCacheFiles(job);
36.          } catch (IOException ioe) {
37.            System.err.println("Caught exception while getting cached files: " + StringUtils.stringifyException(ioe));
38.          }
39.          for (Path patternsFile : patternsFiles) {
40.            parseSkipFile(patternsFile);
41.          }
42.        }
43.      }
44.
45.      private void parseSkipFile(Path patternsFile) {
46.        try {
47.          BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString()));
48.          String pattern = null;
49.          while ((pattern = fis.readLine()) != null) {
50.            patternsToSkip.add(pattern);
51.          }
52.        } catch (IOException ioe) {
53.          System.err.println("Caught exception while parsing the cached file '" + patternsFile + "' : " + StringUtils.stringifyException(ioe));
54.        }
55.      }
56.
57.      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
58.        String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();
59.
60.        for (String pattern : patternsToSkip) {
61.          line = line.replaceAll(pattern, "");
62.        }
63.
64.        StringTokenizer tokenizer = new StringTokenizer(line);
65.        while (tokenizer.hasMoreTokens()) {
66.          word.set(tokenizer.nextToken());
67.          output.collect(word, one);
68.          reporter.incrCounter(Counters.INPUT_WORDS, 1);
69.        }
70.
71.        if ((++numRecords % 100) == 0) {
72.          reporter.setStatus("Finished processing " + numRecords + " records " + "from the input file: " + inputFile);
73.        }
74.      }
75.    }
76.
77.    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
78.      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
79.        int sum = 0;
80.        while (values.hasNext()) {
81.          sum += values.next().get();
82.        }
83.        output.collect(key, new IntWritable(sum));
84.      }
85.    }
86.
87.    public int run(String[] args) throws Exception {
88.      JobConf conf = new JobConf(getConf(), WordCount.class);
89.      conf.setJobName("wordcount");
90.
91.      conf.setOutputKeyClass(Text.class);
92.      conf.setOutputValueClass(IntWritable.class);
93.
94.      conf.setMapperClass(Map.class);
95.      conf.setCombinerClass(Reduce.class);
96.      conf.setReducerClass(Reduce.class);
97.
98.      conf.setInputFormat(TextInputFormat.class);
99.      conf.setOutputFormat(TextOutputFormat.class);
100.
101.      List<String> other_args = new ArrayList<String>();
102.      for (int i=0; i < args.length; ++i) {
103.        if ("-skip".equals(args[i])) {
104.          DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);
105.          conf.setBoolean("wordcount.skip.patterns", true);
106.        } else {
107.          other_args.add(args[i]);
108.        }
109.      }
110.
111.      FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));
112.      FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
113.
114.      JobClient.runJob(conf);
115.      return 0;
116.    }
117.
118.    public static void main(String[] args) throws Exception {
119.      int res = ToolRunner.run(new Configuration(), new WordCount(), args);
120.      System.exit(res);
121.    }
122. }
123.

実行例

入力として使うサンプルテキストファイルは、次のようになっています。

$ bin/hadoop dfs -ls /usr/joe/wordcount/input/
/usr/joe/wordcount/input/file01
/usr/joe/wordcount/input/file02

$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01
Hello World, Bye World!

$ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02
Hello Hadoop, Goodbye to hadoop.

アプリケーションを実行します。

$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output

出力は次のようになります。

$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop, 1
Hello 2
World! 1
World, 1
hadoop. 1
to 1

入力が最初のバージョンのアプリケーションと異なっていること、入力の違いが出力にどのように影響しているかに注意してください。

今度は、DistributedCache を介して、無視する単語のパターンを記述したパターンファイルを使ってみましょう。

$ hadoop dfs -cat /user/joe/wordcount/patterns.txt
\.
\,
\!
to

オプション指定を増やしてアプリケーションをもう一度実行します。

$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=true /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

出力は期待したとおりになります。

$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
Bye 1
Goodbye 1
Hadoop 1
Hello 2
World 2
hadoop 1

今度は、大文字と小文字の違いも無視するようにしてみましょう。

$ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=false /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

期待どおりの出力が得られました。

$ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000
bye 1
goodbye 1
hadoop 2
hello 2
world 2

解説

WordCount アプリケーションの新しいバージョンでは、Map/Reduce フレームワークが提供する機能をいくつか利用して、前回のバージョンより機能を向上させています。

  • Mapper (および Reducer) の実装の configure メソッドでは、アプリケーションから設定パラメータにアクセスする方法を示しています (28-43 行)。
  • DistributedCache を使って、ジョブに必要な読み取り専用データを配布する方法を示しています。ここでは、単語の出現頻度をカウントするときに無視するパターンをユーザーが指定できるようにしています (104 行)。
  • 汎用 Hadoop コマンドラインオプションを処理するための Tool インタフェースと GenericOptionsParser の便利な使い方を示しています (87-116 行および 119 行)。
  • アプリケーションで Counters を使用する方法 (68 行)、さらに、map (および reduce) メソッドに渡される Reporter インスタンスを介してアプリケーション固有のステータス情報を設定する方法 (72 行) を示しています。

Java および JNI は、米国およびその他の国における Sun Microsystems, Inc. の商標または登録商標です。