Documentation Home

Flume ユーザーガイド

flume-dev@cloudera.org [FAMILY Given]

改訂履歴
改訂 0.9.5-SNAPSHOT October 11 2011 F

目次

1. はじめに
1.1. アーキテクチャ
1.2. 信頼性
1.3. スケーラビリティ
1.4. 管理のしやすさ
1.5. 拡張性
1.6. まとめ
2. Flume シングルノードクイックスタート
2.1. ソースと dump コマンド
2.1.1. テキストファイルからの読み取り: text
2.1.2. ファイル名で指定したファイルの末尾の追跡: tailmultitail
2.1.3. 合成ソース: synth
2.1.4. ソースとしての syslog: syslogUdpsyslogTcp
2.2. イベントの解剖
2.3. まとめ
3. 疑似分散モード
3.1. 疑似分散モードでの Flume デーモンの起動
3.1.1. マスター
3.1.2. Flume ノード
3.2. マスターを介したノードの構成
3.3. シンクについて
3.4. 一括構成
3.5. Flume ノードの階層化: エージェントとコレクタ
3.6. まとめ
4. 完全分散モード
4.1. 静的構成ファイル
4.1.1. デフォルト値の使用
4.2. 複数のコレクタ
4.2.1. エージェントを複数のコレクタに分離する構成
4.2.2. フェイルオーバーチェインの手動指定
4.2.3. 自動フェイルオーバーチェイン
4.3. 論理構成
4.3.1. 論理ノード
4.3.2. 論理ソースと論理シンク
4.3.3. フローの分離
4.3.4. まとめ
4.4. 複数のマスター
4.4.1. スタンドアロンモードと分散モード
4.4.2. スタンドアロンモードでの実行
4.4.3. 分散モードでの実行
4.4.4. 構成ストア
4.4.5. どちらの構成ストアを使用するべきか
4.4.6. ZBCS の構成方法
4.4.7. 分散モードでの gossip
4.4.8. 図解: マスターとノードの対話
4.4.9. 複数のマスターサーバーに接続するための Flume ノードの構成
4.5. 外部の ZooKeeper クラスタ
4.6. まとめ
5. Flume とデータソースの統合
5.1. プッシュソース
5.2. ポーリングソース
5.3. 埋め込みソース
5.4. log4j を介した直接のロギング
5.4.1. Hadoop ジョブのロギング例
5.4.2. Hadoop デーモンを対象としたロギング
6. Flume によって収集されたデータの使用
6.1. Flume イベントのデータモデル
6.2. 出力のバケット化
6.3. 出力形式
6.3.1. デフォルト出力形式の構成
6.3.2. 特定のシンクでの出力形式の設定
6.4. 小さなファイルと高いレイテンシ
7. HDFS に書き込まれるファイルの圧縮
8. Flume の高度な利用
8.1. Flume コマンドシェル
8.1.1. Flume コマンドシェルの使い方
8.2. Flume のデータフロー仕様言語
8.2.1. 特殊なシンク: ファンアウト、フェイルオーバー、およびロール
8.2.2. シンクデコレータについて
8.2.3. 高水準ソースおよび高水準シンクの変換
8.3. 独自のメタデータ抽出
8.3.1. 抽出操作
8.3.2. メタデータのフィルタリングと加工
8.3.3. 役割に応じたデフォルト
8.3.4. 任意のデータフローと独自のアーキテクチャ
8.4. シンク/ソース/デコレータプラグインによる拡張
8.4.1. Flume 拡張機能のセマンティクス
8.4.1.1. シンプルソースのセマンティクス
8.4.1.2. バッファ付きソースのセマンティクス
8.4.1.3. シンプルシンク
8.4.1.4. バッファ付きシンクとデコレータのセマンティクス
8.4.1.5. 再試行、スリープ、異常終了
8.5. ソース/シンク・ペア間でのデータ転送レートの制限
9. Flume と HDFS のセキュリティ統合
9.1. 基礎
9.2. Kerberos での Flume ユーザーのセットアップ
9.2.1. Kerberos プリンシパルの管理
10. 付録
10.1. Flume のソースのカタログ
10.2. Flume のシンクのカタログ
10.3. Flume のシンクデコレータのカタログ
10.4. Flume の環境変数
10.5. flume-site.xml 構成設定
10.6. トラブルシューティング
10.6.1. デフォルトポートはどうなっていますか?
10.6.2. どのバージョンの Hadoop HDFS を使用できますか? 使用するバージョンを変更するにはどうすればよいですか?
10.6.3. Flume ノードが Flume マスターに表示されないのはなぜですか?
10.6.4. Flume ノードの状態がすぐに変わるのはなぜですか?
10.6.5. Flume 自体のトラブルシューティングに役立つログはどこにありますか?
10.6.6. ファイルハンドルを使い果たしたためにノードエラーが発生した場合、どう対処すればよいですか?
10.6.7. ディスクフェイルオーバーまたはログ先行書き込み (WAL) 使用時のエラー
10.6.8. Amazon S3 にデータを書き込むことはできますか?
用語集
11. バージョン
11.1. 履歴

図目次

1. Flume フロー: 単一のフロー
2. Flume フロー: 複数のフロー
3. Flume マスター: スタンドアロンモード
4. Flume マスター: 分散モード

概要

Flume は、大量のログデータを効率的に収集、集約、移動することを目的に開発された、高い信頼性と可用性を持つ分散型のサービスです。Flume は、ストリーム指向のデータフローをベースとしたシンプルでフレキシブルなアーキテクチャを採用しています。Flume は、信頼性の調整が可能なメカニズムに加え、フェイルオーバーやリカバリのためのさまざまなメカニズムを備えた、堅牢で耐障害性の高いシステムです。システムは集中管理され、インテリジェントな動的管理が可能です。Flume はシンプルで拡張可能なデータモデルを使用しており、オンライン分析アプリケーションの構築も可能です。

1. はじめに

Flume は、大量のデータをそのデータの生成後すぐに効率的に移動するための高い信頼性と可用性を備えた分散型のサービスです。このリリースでは、クラスタ内でデータを移動するためのスケーラブルなコンジットに加え、信頼性の高いロギングを利用できます。

Flume の第一のユースケースとして想定されているのは、クラスタ内の各マシンのログファイルを収集し、これらのログファイルを Hadoop Distributed File System (HDFS) などの中央の永続的ストアに集約するロギングシステムです。

システムは、次の 4 つの主要目標を念頭に置いて開発されました。

  • 信頼性
  • スケーラビリティ
  • 管理のしやすさ
  • 拡張性

ここでは、Flume のアーキテクチャを概観し、上の目標がどのように実現されているかを説明します。

1.1. アーキテクチャ

Flume のアーキテクチャは、シンプルながらも堅牢で柔軟性があります。Flume の中核をなす抽象化は、ストリーム指向の データフロー です。データフローは、1 つのデータストリームが、その生成元から最終的な宛先まで、どのように転送され、加工されるかを表すものです。データフローは、いくつかの 論理ノード から構成されており、論理ノードは、受け取ったイベントを加工したり、集約したりすることができます。これらの論理ノードがチェインのようにつながって、データフローを形成します。これらの論理ノードのつながり方のことを、論理ノードの 構成 といいます。

これらすべてを管理するのが Flume マスター です。Flume マスターは、Flume のすべての物理ノードと論理ノードに関する情報を把握している独立したサービスです。マスターは論理ノードに構成を割り当て、ユーザーが行った構成の更新をすべての論理ノードに伝える役割を果たします。論理ノードの側では一定時間ごとにマスターにコンタクトし、モニタリング情報を渡すとともに、構成の更新をチェックできるようになっています。

architecture.png

上の図は、一連のアプリケーションサーバーからログデータを収集する Flume の典型的なデプロイを示したものです。デプロイは複数の 論理ノード から構成されていて、論理ノードは 3 つの層に分かれています。最初の層は、 エージェント 層です。通常、エージェントノードは、ログを生成するマシン上にインストールされ、このエージェントノードが Flume との最初の接点になります。エージェントノードは、 コレクタノード から構成される次の層にデータを転送し、コレクタノードは別々の複数のデータフローを集約して、これを最後の ストレージ層 に転送します。

エージェントは、たとえば、 syslog のデータを受け取るマシンや、Web サーバーまたは Hadoop JobTracker などのサービスのログを監視するマシンです。エージェントは、データのストリームを生産し、このデータのストリームはコレクタに送られます。コレクタは受け取ったストリームをたばね、HDFS などのストレージ層に効率的に書き込むことができる、より大きなストリームに集約します。

論理ノードは、高い柔軟性を備えた抽象化です。どの論理ノードも、 ソースシンク という 2 つのコンポーネントを持っています。ソースは、どこからデータを収集すればよいかを論理ノードに教え、シンクは、収集したデータをどこに送ればよいかを論理ノードに教えます。2 つの論理ノードがあったとすると、その違いは、ソースとシンクがどのように構成されているかだけです。ソースもシンクも、追加で デコレータ を持つように構成できます。デコレータは、そこを通るデータに、単純な処理を加えます。上の例では、コレクタとエージェントは 同一のノードソフトウェア を実行しています。マスターは、実行時に各論理ノードに構成を割り当てます。つまり、あるノードの構成のすべてのコンポーネントは実行時に動的にインスタンス化されます。したがって、Flume サービスのライフタイム全体を通じて構成は何度でも変更することができ、Java プロセスを再起動したり、個々のマシンにログインしたりする必要はありません。実際、論理ノード自体は、動的に作成したり削除したりできます。

ソース、シンク、および (省略可能な) デコレータは、Flume の強力なプリミティブです。Flume はこのアーキテクチャを利用して、フローごとのデータプロパティ (永続性の保証、圧縮、バッチ化など) を実現しているほか、イベントメタデータを計算したり、新しいイベントを作成してデータフローに挿入するといったことを可能にしています。論理ノードは、下流の複数の論理ノードにデータを送ることができます。このため、複数のフローを用意することができ、各サブフローごとに構成を変えることができます。たとえば、1 つのフローは収集経路としてデータを永続的ストアに確実に配信する用途に使用する一方、別のブランチではライトウェイトな分析を行って結果を警告システムに送る、といったことができます。

1.2. 信頼性

信頼性、すなわち障害発生時もデータを失うことなくイベントを配信し続けることができる能力は、Flume の最も重要な機能です。大規模分散システムは、さまざまな方法で部分的障害に耐えることができ、現実にそうしています。たとえば、物理的なハードウェアが故障することもあれば、ネットワークの帯域幅やメモリなどのリソースが不足することも、ソフトウェアがクラッシュしたりスローダウンしたりすることもあります。Flume はコアの設計原則として耐障害性 (フォールトトレランス) を重視しており、多数のコンポーネントが障害を起こした場合でも、ソフトウェアの実行とデータの収集を続行します。

Flume では、エージェントノードが実行を続けている限り、エージェントノードが受け取ったすべてのデータが最終的にフローの終端のコレクタに届くことを保証できます。つまり、データはその最終的な目的地まで 確実に 届けられます。

ただし、信頼性の高い確実なデータの配信は、非常に多くのリソースを消費し、データソースによっては過剰保証となることがあります。そのため Flume では、必要な信頼性のレベルをフローごとにユーザーが指定できるようになっています。サポートされている信頼性のレベルは次の 3 つです。

  • エンドツーエンド
  • 障害発生時には保存
  • ベストエフォート

エンドツーエンド (end-to-end) の信頼性レベルでは、Flume がいったん受け取ったイベントは、そのイベントを受け取ったエージェントが生存している限り、その端点に届くことが保証されます。この信頼性レベルにおいてエージェントがまず行うことは、 先行書き込みログ (WAL) の形でイベントをディスクに書き込み、エージェントがクラッシュして再起動した場合でも、そのイベントに関する情報が失われないようにすることです。イベントがフローの終端に無事届けられると、そのことを示す受信確認が元のエージェントに送り返され、元のエージェントの側では、イベントをディスクに格納しておく必要がなくなったことがわるようになっています。この信頼性レベルでは、最初のエージェントの下流で発生した任意の数の障害に対応することができます。

障害発生時には保存 (store on failure) の信頼性レベルでは、ノードは 1 ホップ下流のノードからの受信確認を要求するだけです。送信側のノードが障害を検出した場合、そのノードは下流ノードが復旧するまでデータをローカルディスクに保存するか、または代わりとなる下流の送信先を選択することできます。これは効率の面では優れていますが、複合的な障害や、具体的な症状を伴わない障害が発生すると、データが失われる可能性があります。

ベストエフォート (best effort) の信頼性レベルでは、確認や再送信を一切行わずにデータを次のホップに送ります。ノードで障害が発生すると、そのノードで送信中または受信中だったデータが失われる可能性があります。これは最も低い信頼性レベルですが、最も軽量です。

1.3. スケーラビリティ

スケーラビリティとは、システムにさらにリソースを追加することでシステムのパフォーマンスをリニアに (またはそれ以上に) 向上できる能力のことです。Flume は水平方向のスケーラビリティ、すなわちスループットの向上を目的としてシステムにマシンをインクリメンタルに追加できることを目標にしています。Flume で重要なパフォーマンスの尺度は、システムが受け入れて配信されるイベントの数またはサイズです。負荷が高くなったときは、マシンの数を増やすという形で簡単にシステムにリソースを追加し、増大した負荷に対応することができます。

上に図示した例で見たように、Flume には、それぞれスケーラビリティに対して異なるアプローチが必要な 3 つの独立したコンポーネント、すなわちコレクタ層、マスター、およびストレージ層があります。

コレクタ層は、大量のエージェントノードからやってくる膨大な量のデータを処理することを目的としたスケーラビリティを持つ必要があります。ここでの作業負荷は、書き込みに重点があり、分離可能で、したがって並列化が可能です。コレクタ層にマシンを追加すれば、エージェントの数を増やしてシステムの最大利用可能スループットを向上させることができます。

通常、個々のコレクタは、多数の (数百の) エージェントに対応することができます。コレクタが利用可能な全帯域幅に比べれば、一般に個々のエージェントが生産するログデータは、非常に少量だからです。したがって Flume は、コレクタ間でエージェントからのフローのバランスを取ります。(エージェントからの 1 つのフローの行き先となるのは、1 つのコレクタです。) Flume は確率的アルゴリズムを使って、コレクタのリストをフローに均等に割り当てます。こうして負荷を自動的に分散するとともに、コレクタの 1 つで障害が発生した場合でも、負荷が分散された状態を維持することができます。

システム内のノード数が増加すると、Flume マスターとの間でやり取りされるコントロールパス上のトラフィック量がボトルネックになる可能性があります。Flume マスターの場合、ごく少数のコモディティサーバーで大量のノードにサービスを提供することができますが、マシンの追加による水平方向のスケーラビリティもサポートされています。この場合、Flume マスターの状態は、同期化されて完全にレプリケートされるので、Flume マスターは耐障害性と高いスケーラビリティを併せ持つことになります。

なお Flume は、最終目的地が受け取ることができるレートでのみ、フローを介してデータを書き込むことができます。Flume はフロー内部でデータをバッファリングして大規模なバーストを平均化することができますが、ログが詰まるのを避けるには、出力レートが平均して入力レートと等しくなっている必要があります。このため、スケーラブルなストレージに書き込むようにすることが推奨されます。たとえば、HDFS は数千台のマシンに対応可能なことが示されており、ペタバイト級のデータを処理することができます。

1.4. 管理のしやすさ

管理のしやすさとは、データフローのコントロール、ノードの監視、設定の変更、および大規模システムの出力のコントロールを実現可能な能力のことです。ソースからエンドポイントまでのデータフローを手作業で管理するのは、単調で退屈な作業であり、間違いが紛れ込みやすく、人間には苦痛です。ログを生成するアプリケーションやサービスが数千の規模になると、データフローを監視、変更するための集中管理ポイントを用意し、さまざまな状況や問題に動的に対処できる能力を持つことが重要になります。

Flume マスターは、データフローなどのグローバルな状態を管理できる場所です。ユーザーは Flume マスターを介してフローを監視し、フローを直接再構成することができます。Flume マスターは、負荷の不均衡、部分的な障害、新しく追加されたハードウェアなど、システムの変更に自動的に対応するのに必要な情報を持っています。

Flume マスターを使えば、ノードを動的に再構成できます。このガイドでは伝統的な 3 層デプロイの例を取り上げていますが、ノードには柔軟性があるため、任意のノードトポロジを採用することができます。柔軟なデータフロー仕様言語で小さなスクリプトを作成し、これを Flume マスターのインタフェースを介してサブミットすれば、ノードを再構成することができます。

Flume マスターは、Web インタフェースと、スクリプトの作成が可能な Flume コマンドシェルの 2 つインタフェースのいずれかを使って管理することができます。Web インタフェースでは、システムの状態のインタラクティブな更新が可能です。シェルでは、自分で作成したスクリプトや、マシンが生成したスクリプトを使って、管理作業を行うことができます。

1.5. 拡張性

拡張性とは、システムに新しい機能を追加できる能力のことです。たとえば、既存のストレージレイヤまたはデータプラットフォームにコネクタを追加して Flume を拡張することができます。Flume では、シンプルなインタフェース、組立可能な部品として互いに独立させた各種機能、フロー仕様言語、およびシンプルながらも柔軟なデータモデルによって拡張性を実現しています。

Flume は、一般的な入出力コネクタを多数用意しています。新しい入力コネクタ (ソース) が追加されると、そのソースに固有の追加のメタデータフィールドが利用可能になり、そのソースが生産する各イベントにこれらのメタデータフィールドを付加することができます。Flume が利用する一般的なコンポーネントは、特定の信頼性やリソース使用に関するプロパティを持っています。一般的なソースとしては、ファイルシステムのファイル、syslog と syslog-ng のエミュレーション、プロセスの標準出力などがあります。IRC チャンネルや Twitter のストリームなど、より具体性の高いソースも追加できます。イベントの出力先も同様にたくさんあります。第一の出力先は HDFS ですが、イベントをローカルファイルに送ったり、Ganglia などの監視・警告アプリケーション、あるいは IRC などのコミュニケーションチャンネルに送ったりすることもできます。

Flume では、HDFS、MapReduce、および Hive と簡単に統合できるよう、出力ファイル管理と出力形式管理のためのシンプルなしくみを用意しています。Flume によって集められたデータは、Hadoop と Hive で簡単に処理することができます。

1.6. まとめ

ここでは Flume の目標と機能を概観しました。以下では Flume のセットアップ方法と使い方について説明します。

  • 単一の Flume ノードを使ったステップバイステップ形式のチュートリアル
  • Flume マスターの説明、および Flume マスターによってコーディネートされた複数のノードで構成される疑似分散モードの説明
  • 単一障害点を作らない完全分散モードのセットアップ方法の説明
  • Flume のユースケース、および Flume と既存のデータソースとの統合についての説明
  • Hadoop と Hive などの重量級分析システムに組み込むための Flume 出力のセットアップ方法
  • Flume のデプロイと独自のフローのセットアップ方法、および Flume のデータフロー仕様言語についての説明
  • Flume のデータフロー仕様言語で利用可能なコンポーネントのカタログ
  • 実験的機能についての説明
  • トラブルシューティング情報

2. Flume シングルノードクイックスタート

ここでは、Flume をシングルノードで実行してデータを転送する方法について説明します。また、いくつかデータ ソース を取り上げ、ノードごとに Flume のフローを構成する方法も示します。

各論理ノードは、イベントを生産する ソース とイベントを消費する シンク から構成されます。ノードはソースからデータをプルし、シンク経由でデータをプッシュして送り出します。

[注記] 注記

以下では、Flume ノードと Flume マスターがデーモンとしてではなく、フォアグラウンドで実行されていることを前提としています。デーモンを停止するには、 /etc/init.d/flume-master stop/etc/init.d/flume-node stop を実行します。

2.1. ソースと dump コマンド

まず最初に、コンソールから標準入力に書き込まれたデータを、コンソールの標準出力にエコーバックする Flume ノードを実行してみましょう。それには dump コマンドを使います。

$ flume dump console
[ティップ] ティップ

Flume プログラムは一般に flume <command> [args ...] という形式で実行します。Flume を tarball パッケージからインストールした場合、flume コマンドは $FLUME_HOME/bin/ にあります。Flume を RPM または DEB からインストールした場合、flume コマンドはすでにパスに含まれています。

[ティップ] ティップ

上の実行例では、 dump コマンドを使用し、引数に console を指定しています。コマンドの書式は、 flume dump <source> [<outputformat>] です。このコマンドは <source> からのデータをコンソールに表示します。出力形式を指定することもできます。出力形式が省略された場合は、デフォルトのテキスト形式が使われます。

[注記] 注記

一部の flume の構成では、デフォルトでローカルディスクへの書き込みを行います。この場合、書き込み先のデフォルトは /tmp/flume になっています。この設定はテスト環境では問題ありませんが、実働環境ではもっと永続性のある場所を flume.agent.logdir プロパティで指定する必要があります。

[注記] 注記

ノードを実行することができずに、次のようなメッセージが表示されることがあります: agent.FlumeNode: Aborting: Unexpected problem with environment. Failure to write in log directory: /tmp/flume. Check permissions? 。このような場合は、 /tmp/flume ディレクトリに対して書き込みパーミッションがあるかどうかチェックしてください (ディレクトリのオーナーを変更するか、またはユーザーをディレクトリのグループに追加します)。デフォルトでは、この場所にさまざまなログ情報が保持されます。

さて、上のプログラムによって、Flume ノードが起動されました。入力データのソースは console です。プログラムを実行すると、たくさんのログメッセージが画面に表示されます。このうち、マスター、バックオフ、接続失敗に関するメッセージについては、今は無視してもかまいません (あとで説明します)。コンソールで何か入力して改行キーを押すと、入力したデータを含む新しいログエントリ行が表示されるはずです。たとえば、 This is a test と入力すると、次のように表示されます。

hostname [INFO Thu Nov 19 08:37:13 PST 2009] This is a test

プログラムを終了するには、^C を入力します。

[注記] 注記

ソースによっては自動的に終了しないので、プログラムを終了するにはユーザーが ^C を入力する必要があります。

2.1.1. テキストファイルからの読み取り: text

イベントのソースにはさまざまなものを指定できます。たとえば、各行が新しいイベントを表すテキストファイルをソースに指定するには、次のようなコマンドを実行します。

$ flume dump 'text("/etc/services")'

このコマンドは指定されたファイルを読み取り、各行を新しいイベントとして出力します。

[注記] 注記

デフォルトのコンソール出力では、特殊文字は Java スタイルのエスケープシーケンスでエスケープされます。"\ などの文字は、その前にもう一つ \ が付きます。

[注記] 注記

上のコマンドは、 /var/log/messages/var/log/syslog/var/log/hadoop/hadoop.log などのほかのファイルに対しても使用できます。ただし、実行する Flume に、ファイルを読み取るための適切な権限がなければなりません。

2.1.2. ファイル名で指定したファイルの末尾の追跡: tailmultitail

ファイルを読み取るのではなく、ファイルの末尾を追跡する場合は、 text の代わりに tail を使ってソースを指定します。

$ flume dump 'tail("testfile")'

このコマンドは、データをファイルから Flume にパイプし、次にコンソールに出力します。

次のメッセージが表示されます: "File testfile does not currently exist, waiting for file to appear"

ほかの端末から、データをファイルに書き込んで、ファイルを作成します。

$ echo Hello world!>> testfile

新しく書き込んだデータが表示されるはずです。

ファイルを削除します。

$ rm testfile

tail シンクはファイルが削除されたことを検出します。このあと、ファイルを再作成すると、 tail ソースは新しいファイルを検出して追跡を再開します。

$ echo Hello world again!>> testfile

Flume ノードのコンソールに新しいメッセージが表示されるはずです。

multitail ソースを使うと、ファイル名で指定した複数のファイルを追跡することができます。

$ flume dump 'multitail("test1", "test2")'

2 つの異なるファイルに入力されたデータをイベントとして送信できます。

$ echo Hello world test1!>> test1
$ echo Hello world test2!>> test2

tail ソースは、デフォルトでは \n をデリミタとして扱い、デリミタをイベントから除外します。省略可能な行デリミタ引数を使うと、任意の正規表現をデリミタとして指定することができ、デリミタを直前のイベント (prev) または直後のイベント (next) の一部として扱うか、あるいはデリミタをイベントから除外する (exclude) かどうかも指定できます。

以下にいくつか例を示します。

次の例では、2 つ以上連続して出現する改行がデリミタとして使われているファイルを追跡します。改行はイベントから除外されます。

tail("file", delim="\n\n+", delimMode="exclude")

次の例では、 </a> をデリミタとしてファイルを追跡し、デリミタを直前のイベントに追記します。この例は、たとえばクイック&ダーティな xml レコードスプリッタとして利用できます。

tail("file", delim="</a>", delimMode="prev")

次の例では、正規表現 "\n\d\d\d\d" をデリミタとしてファイルを追跡し、デリミタを直後のイベントに追記します。この例は、(日付を表すタイムスタンプの年の部分のように) 4 桁の数字で各行が始まるログファイルのスタックダンプから行を集めるのに利用できます。

tail("file", delim="\\n\\d\\d\\d\\d", delimMode="next")

2.1.3. 合成ソース: synth

次に示すのは、 synth ソースを使ってイベントを生成する例です。

$ flume dump 'asciisynth(20,30)'

この例を実行すると、20 個のイベントが生成されます。各イベントは、30 バイトのランダムな ASCII 文字です。

2.1.4. ソースとしての syslog: syslogUdpsyslogTcp

ファイル同様、syslog などのよく知られたデータフォーマットのデータを受け取ることもできます。たとえば、次のコマンドを実行すると、ポート 5140 でリスンする伝統的な syslog ライク な UDP サーバーを起動することができます (通常の syslog UDP ポートは特権ポートの 514 です)。

$ flume dump 'syslogUdp(5140)'

次のようにして netcat を使って syslog 形式のデータを送信すれば、ソースデータを供給できます。

$ echo "<37>hello via syslog"  | nc -u localhost 5140
[ティップ] ティップ

コマンドを終了するには、^C の入力が必要になることがあります。

[注記] 注記

冒頭の <37> は、メッセージのカテゴリとプライオリティレベルを示す syslog のデータフォーマットです。

同じ要領で、TCP ポート 5140 でリスンする syslog-ng 互換のソースをセットアップすることができます (通常の syslog TCP ポートは特権ポートの 514 です)。

$ flume dump 'syslogTcp(5140)'

データを送信します。

$ echo "<37>hello via syslog" | nc -t localhost 5140
[ティップ] ティップ

コマンドを終了するには、^C の入力が必要になることがあります。

syslog には後方互換性があるので、syslog、rsyslog、または syslog-ng で作成された通常のデータを Flume に送信して処理させることができます。

2.2. イベントの解剖

ここでは、Flume が扱うことのできるさまざまなデータソースを見てきました。ちょうどよい機会ですから、先へ進む前に、Flume が実際には何を送信し、内部的にどう処理しているのかを説明しておきましょう。

Flume はすべてのデータソースを内部で イベント のストリームに変換します。イベントは、Flume のデータの単位であり、シンプルながらも柔軟なデータ表現です。イベントは 本体 (body) と メタデータ から構成されます。イベントの本体は、イベントの内容を表すバイト文字列です。たとえば、ログファイル中の 1 行は、その行の実際のバイト表現を本体とするイベントとして表現されます。イベントのメタデータは、イベントについての詳細を表すキー/値のペアのテーブルです。イベントについての詳細とは、たとえば、そのイベントが作成された時刻、そのイベントが生成されたマシンなどの情報です。このテーブルは、Flume のフローに沿って移動していくイベントに追記することができ、このテーブルを読み取ることで、フローを構成する個々のコンポーネントの動作をコントロールすることができます。たとえば、イベントに添付されたマシン名を使って、フローの最後でそのイベントを書き込む先の出力パスをコントロールすることができます。

イベントの本体の長さは 32KB までです。この制限はシステムプロパティで管理することができますが、パフォーマンス維持のためには、制限を変更しないことが推奨されます。

2.3. まとめ

ここでは、Flume の dump コマンドを使って、さまざまな入力ソースのデータをコンソールに表示する方法を学びました。また、Flume のデータ転送の基本単位である イベント についても学びました。

次の表は、ここで取り上げたソースをまとめたものです。

表1 Flume のイベントソース

console

標準入力コンソール。

text("filename")

ワンショットのテキストファイルソース。1 行が 1 つのイベント。

tail("filename")

Unix の tail -F と同様のソース。1 行が 1 つのイベント。オープンしたまま新しいデータの到着を待ち、ファイルがローテートされてもファイル名で指定されたファイルを追跡します。

multitail("file1"[, "file2"[, …]])

tail ソースに似ていますが、複数のファイルを追跡します。

asciisynth(msg_count,msg_size)

サイズが msg_size のランダムなメッセージを msg_count 個だけ生成するソース。すべての文字は印字可能な ASCII 文字に変換されます。

syslogUdp(port)

UDP ポート port でリスンする syslog。syslog 互換。

syslogTcp(port)

TCP ポート port でリスンする syslog。syslog-ng 互換。


3. 疑似分散モード

Flume は、複数のプロセスが 多数 のマシン上に散らばった分散システムとして実行されることを想定して作られていますが、 単一 のマシン上で複数のプロセスとして実行することもできます。後者の実行モードは、 “疑似分散” モードと呼ばれます。疑似分散モードは、Flume のデータフローをデバッグしたり、Flume のコンポーネント間の相互関係を理解したりするときに利用すると便利です。

すでに、Flume のノードについては取り上げ、Flume のソースの概要についても紹介しました。ここでは、分散モードをセットアップするときに必要なコンセプト、すなわち Flume マスター サーバー、ソースと シンク の指定、 および複数の Flume ノード の接続について説明します。

3.1. 疑似分散モードでの Flume デーモンの起動

Flume のシステムには、Flume マスター と Flume ノード の 2 種類のプロセスがあります。このうち、Flume マスターは集中管理ポイントで、ノードのデータフローをコントロールします。Flume マスターは単一の論理エンティティで、グローバルな状態に関するデータを保持し、Flume ノードのデータフローをコントロールし、Flume ノードを監視します。Flume ノードは、イベントのストリームが通るデータパスの役割を果たします。Flume ノードは、イベントデータのソースとなることもあれば、コンジット、または消費者になることもあります。ノードは一定時間ごとにマスターにコンタクトし、ハートビートを送信するとともに、データフローの構成を受け取ります。

Flume の分散システムを動作させるには、1 つの Flume マスターと、このマスターと情報をやり取りする複数の Flume ノードを起動しなければなりません。ここでは、最初にマスターとノードを 1 つ起動し、あとからこれを拡張していきます。

3.1.1. マスター

マスターを手動で起動するには、次のコマンドを入力します。

$ flume master

マスターが起動した後、Web ブラウザで http://localhost:35871/ という URL を入力にすると、マスターにアクセスできます。Web ページには、マスターにコンタクトしてきたすべての Flume ノードの状態と、各ノードに現在割り当てられている構成が表示されます。Flume ノードを実行せずにマスターを起動した場合、状態と構成を示すテーブルは空のままです。

master-empty.png

Web ページには、[Node status] テーブル、[Node configuration] テーブル、[Physical/Logical Node mapping] テーブル、および [Command history] テーブルの 4 つのテーブルが表示されます。これらのテーブルの情報は、Flume システムの現在のグローバルな状態を表しています。

マスターの [Node status] テーブルには、マスターとコンタクトのあるすべての Flume ノードの名前、各ノードの現在の構成バージョン (初期状態は "none")、各ノードのステータス (IDLE など)、および各ノードが最後にマスターに対して報告を行った日時が表示されます。各 Flume ノードの名前は、Unix プロンプトから hostname を実行したときの名前と同一のはずです。

マスターの [Node configuration] テーブルには、ノードの論理名、ノードに割り当てられている構成バージョン、およびノードのソースとシンクの指定が表示されます。このテーブルは最初は空ですが、値を変更してからこの Web ページにアクセスすると、更新内容を確認できます。列には 2 つのセットがあり、ユーザーが入力したバージョン/ソース/シンクと、変換されたバージョン/ソース/シンクに分かれています。変換された構成については、このガイドの後の方で説明します。

マスターの [Physical/Logical Node mapping] テーブルには、論理ノードから物理ノードへのマッピングが表示されます。

マスターの [Command history] テーブルには、コマンドの状態が表示されます。一般に、 コマンド はマスターのグローバルな状態を変更します。コマンドはマスター上で順番に処理され、それぞれに固有の ID 番号が割り当てられます。各コマンドには、状態 (SUCCEEDED、FAILED、PENDING など)、コマンドライン、およびメッセージ (通常はコマンドの実行状況を示す情報) の列があります。

3.1.2. Flume ノード

Flume ノードを起動するには、別の端末で次のコマンドを入力します。

$ flume node_nowatch
[注記] 注記

通常は、 flume node を使って、つまりデーモンとしてノードを起動します。ここでは、 node_nowatch オプションを指定してノードを起動し、watchdog 機能を無効にしています。node_nowatch オプションを指定すると、コンソールを介してノードと対話することができます。ほかのオプションでは、 stdin は無効になります。

Flume ノードが起動しているかどうかをチェックするには、ブラウザで http://localhost:35862/ にアクセスし、[Flume Node] ステータスページを表示します。各ノードはそれぞれのデータを単一のテーブルに表示します。テーブルには、ノードに関する診断情報とメトリクス、データフロー、およびそのノードが実行されているマシンのシステムメトリクスが表示されます。1 台のマシン上で Flume ノードプログラムの複数のインスタンスを実行した場合、それぞれのインスタンスはポート番号を自動的にインクリメントして次のポート (35863、35864 など) にバインドを試み、最終的に選択されたポートを記録します。

ノードが起動している場合、ノードがマスターにコンタクトしていることを確認するには、マスターのステータスページ (http://localhost:35871) を更新する必要があります。ここではノードを 1 つ起動したので (ノードの名前は host とします)、マスターの [Node status] テーブルにはノードが 1 つ表示され、[Physical/Logical Node mapping] テーブルには、 host 論理ノードが host 物理ノードにリンクされたエントリが表示されるはずです。

3.2. マスターを介したノードの構成

ノードに対し、マスターにコンタクトして構成を取得するよう要求すれば、リモートマシンにログインしてデーモンを再起動しなくても、ノードの構成を動的に変更することができます。それまでのノードのデータフローの構成も、速やかに新しい構成に変更できます。

以下では、マスターの Web インタフェースを使ってノードを "つなげる" 方法を説明します。

マスターの Web ページで、config リンクをクリックします。フォームが 2 つ表示されます。これらのフォームは、ノードのデータフローを設定するための Web インタフェースです。Flume ノードは、マスターにコンタクトしたときに、データフローのバージョンが変更されたことを知り、構成をインスタンス化して有効にします。

ここでは、クイックスタートの手順を実行してみましょう。[Configure a single node] フォームに次の値を入力し、[送信] をクリックします。

Node name: host
Source: console
Sink: console

マスターページを更新すると、バージョンスタンプが現在の日時に変わっていて、構成情報の source フィールドと sink フィールドが更新されているのがわかります。ステータスが "ACTIVE" に変わると、ノードはコンソールトラフィックを受け取ることができる状態になります。

マスター上では、ノードは次のいずれかの状態をとります。

  • HELLO : 新しいノードインスタンスが初めてマスターにコンタクトしました。
  • IDLE : ノードは構成を完了したか、または構成を持っていません。
  • CONFIGURING: ノードは構成を受け取り、有効にしているところです。
  • ACTIVE: ノードはアクティブ状態で、ソースからデータをプルし、シンクにデータをプッシュしています。
  • LOST: ノードは期待された時間内に (デフォルトでは期待されたハートビートの 10 倍の時間、すなわち 50 秒間) マスターにコンタクトしませんでした。
  • DECOMMISSIONED: ノードはマスターから強制的に無効化されました。
  • ERROR: ノードはエラー状態で停止しています。

Flume ノードを実行している端末上で、数行のデータを入力すると、新しいログメッセージが表示されるはずです。

Node name: host
Source: text("/etc/services")
Sink: console
[注記] 注記

Flume ノードのコンソールで Enter キーの入力が必要になることがあります。

または、次の値を使ってファイルの末尾を追跡することもできます。

Node name: host
Source: tail("/etc/services")
Sink: console

同様にして、マスター経由でシステムのさまざまなノードの構成を変更すれば、各種のソースからデータを収集することができます。

3.3. シンクについて

Flume には、新しいイベントを生成したり、システムに供給される新しいイベントを受け取ったりするさまざまなソースがあることは、既にみたとおりです。ただし、これまではメッセージの出力を console シンクに限っていました。しかし、容易に想像がつくように、Flume にはあらゆるイベントの送り先となるさまざまなイベント シンク が用意されています。

イベントは、ローカルディスクをはじめ、HDFS、コンソール、さらネットワークを介した転送など、数多くの送り先に送ることができます。Flume では、こうした送り先にイベントを転送する際のインタフェースとして、シンクという抽象化を使用します。

新しい構成を指定してマスターに送信すれば、ソースをさまざまなシンクに接続することができます。たとえば、次のようなデータフローを指定すれば、 /etc/services のコピーを作成できます。

Node name: host
Source: text("/etc/services")
Sink: text("services.copy")
[警告] 警告

text シンクは、ファイルがすでに存在する場合、そのファイルを上書きします。

この場合、ファイルは元の内容のままでコピーされます。シンクには出力形式を指定するための省略可能なオプションがあり、このオプションを使えば、ほかの直列化形式でデータを書き込むことができます。たとえば、デフォルトの "raw" 形式でファイルをコピーする代わりに、"avrojson" (Avro 直列化 JSON 形式)、"avrodata" (Avro 直列化バイナリデータ形式)、"debug" モード (console シンクで使われる形式) など、ほかの出力形式でデータを書き込むことができます。

次のように入力したとします。

Node name: host
Source: text("/etc/services")
Sink: console("avrojson")

すると、ファイルの各レコードが JSON 形式でコンソールに表示されます。

次のように入力したとします。

Node name: host
Source: text("/etc/services")
Sink: text("services.json", "avrojson")

新しく書き込まれるローカルファイル services.json は、Avro の JSON 形式で出力されます。

使用できるシンクはたくさんあります。次の表はそのうちの一部を示したものです。これ以外のシンクについては、付録を参照してください。

表2 Flume のイベントシンク

null

Null シンクです。イベントは破棄されます。

console[("format")]

console シンクです。イベントをコンソールの標準出力に表示します。"format" 引数は省略可能で、デフォルトは "debug" 出力形式です。

text("txtfile"[,"format"])

テキストファイルシンクです。出力形式 "format" でテキストファイル txtfile にイベントを書き込みます。デフォルトの出力形式は "raw" で、メタデータなしにイベント本体を書き込みます。

dfs("dfsfile")

DFS seqfile シンクです。直列化された Flume イベントを hdfs://namenode/filefile:///file などの dfs パスに Hadoop の seqfile 形式で書き込みます。HDFS の書き込みセマンティクスにより、このシンクへのデータは、シンクがクローズされるまで書き込まれません。

syslogTcp("host",port)

syslog TCP シンクです。イベントを syslog の (syslog-ng 互換) データフォーマットで host の TCP ポート port 、または syslogTCP をリスンするようセットアップされたほかの Flume ノードに転送します。


[警告] 警告

dfs の使用にはいくつか制限があるほか、追加のセットアップ作業が必要になります。ファイルの内容は、シンクがクローズされるまで利用可能になりません。詳細については、「トラブルシューティング」を参照してください。

3.4. 一括構成

フォームを使って単一ノードの構成を行うやり方は、ごく少数のマシンを対象とする場合にはそれほど手間ではありませんが、マシンの数が多い場合には、すべてのマシンの構成を 1 つのファイルで管理するか、構成を自動生成した方が効率的です。Flume では、多数のマシンの構成を一括して設定することができます。

それには、上の「マスターを介したノードの構成」に示した方法の代わりに、[Configure multiple nodes] フォームに次の構成を入力し、[送信] をクリックします。

host : text("/etc/services") | console ;

または、次のように入力します。

host: text("/etc/services") | text("services.copy");

一般的な書式は次のとおりです。

<node1> : <source> | <sink> ;
<node2> : <source> | <sink> ;
<node3> : <source> | <sink> ;
...

これ以降、このガイドでは、上の書式を使ってノードを構成します。

3.5. Flume ノードの階層化: エージェントとコレクタ

単純なネットワーク接続も、抽象的にはシンクの 1 つです。ネットワークを介したイベントの送信が、簡単で効率的、かつ信頼性があれば、非常に便利です。しかし現実には、ネットワーク接続に依存しつつ、分散した複数のマシンからデータを収集しようとすると、障害が発生する可能性も、発生する障害の種類も大幅に増えます。要するに、信頼性の保証を得ようとすると、それだけ複雑さも増し、多くの面でトレードオフを強いられます。

Flume では、あらかじめ定義されたトポロジと調整可能な信頼性を用意することで、この問題に簡単に対処できるようにしています。具体的には、ユーザーが行う必要があるのは、各ノードに役割を与えることだけです。たとえば、シンプルな Flume ノードトポロジとして、Flume ノードを 2 つの層、すなわち Flume エージェント 層と Flume コレクタ 層に分けるトポロジがあります。この場合、エージェント層の Flume ノードは、ログを生成するサービスが実行されているマシン上に配置します。たとえば、Flume エージェントを syslogTcp をソースとして構成し、syslog を生成するサーバーのログの送信先を、この Flume エージェントで指定したローカルポートにします。そして、この Flume エージェントのシンクを agentSink とし、コレクタ層のノードにデータを転送するように構成します。

コレクタ層のノードは、複数のエージェントから届くデータをリスンし、ログを集約し、最後にデータを HDFS に書き込みます。

[警告] 警告

以下の説明で使われている host 引数は、いずれもノードの物理 IP または DNS 名に読み替えてください。これらの host 引数は、Flume ノード名 (-n name オプションで指定する名前) ではないことに注意してください。デフォルトの Flume ノード名は、コマンドラインで設定を上書きしない限り、ホスト名と同じです。

疑似分散モードで新しいシンクの動作を確認するために、ローカルマシン上でもう 1 つ Flume ノード (物理ノード) をインスタンス化します。具体的には、いくつかオプションを追加して Flume ノードを起動する必要があります。次のコマンドを入力すると、collector という名前の (-n collector) 物理ノードが起動されます。

$ flume node_nowatch -n collector

マスターの Web ページには、2 つのノード hostcollector が表示されるはずです。[Flume Node] ステータスページは、それぞれ http://localhost:35862http://localhost:35863 にアクセスすると表示されるはずです。ポートのバインド先はインスタンス化の順序によって異なり、最初の物理ノードのバインド先が 35862 に、2 番目の物理ノードのバインド先が 35863 になります。

次に、一括構成フォームを使って、 collector がコレクタの役割を引き受けるよう構成し、 host の方は、コンソールからコレクタにデータを送信するよう設定します。エージェントは、信頼性の高いネットワークシンクである agentSink を使用します。collector ノードのソースは collectorSource として構成し、シンクは console にします。

host : console | agentSink("localhost",35853) ;
collector : collectorSource(35853) | console ;
tiers-console.png

host のコンソール上で何行かデータを入力すると、イベントはコレクタに転送されます。今のところ、転送されたメッセージが collector 上に表示されるまで、いくらか遅延 (15秒程度) があります。これは実際には構成可能な設定で、デフォルト値はイベントのスループットが高くなるように設定されています。Flume の設定を調整する方法については、このガイドの後の方で説明します。

さて、以上でエージェントからコレクタへのイベントフローを作成することができました。

[ティップ] ティップ

agentSink が動作しているかどうかは、シンクの先行書き込みログのディレクトリを覗けばチェックできます。ディレクトリのデフォルトの場所は、 /tmp/flume/<nodename> です。この場所はオペレーティングシステムによって定期的にデータが削除されるので、実働環境ではこのディレクトリの構成プロパティ (flume.agent.logdir) を変更する必要があります。

もう少し興味深い例として、エージェントの構成を少し変えて、ローカルファイルの末尾を追跡したり (tail ソースを使用)、ローカルマシンの syslog データをリスンしたり (syslogTcp ソースまたは syslogUdp ソースを使用し、syslog デーモンの設定を変更) することもできます。また、コレクタも、コンソールに書き込む代わりに、より高度な処理を行うことができる collectorSink に書き込むようにすることができます。 collectorSink は、ディスクや HDFS に書き込みを行い、一定時間ごとにファイルをローテートし、受信確認を管理するシンクです。

tiers-hdfs.png

次に示すのは、syslog メッセージをリスンしてコレクタに転送し、コレクタがローカルディレクトリ /tmp/flume/collected/ にファイルを書き込む構成です。

host : syslogTcp(5140) | agentSink("localhost",35853) ;
collector : collectorSource(35853) | collectorSink("file:///tmp/flume/collected", "syslog");

次に示すのは、上の構成を少し変更して、コレクタが HDFS クラスタに書き込むようにした構成です (HDFS NameNode の名前が namenode の場合)。

host : syslogTcp(5140) | agentSink("localhost",35853) ;
collector : collectorSource(35853) | collectorSink("hdfs://namenode/user/flume/","syslog");
[注記] 注記

HDFS ファイルが適切にクローズされない限り、HDFS ファイルに書き込まれたデータが永続化される保証はありません。このため、コレクタシンクは一定時間ごとにファイルをクローズし、HDFS 上に新しいファイルを作成します。ファイルロール (クローズして新しいファイルをオープンする) 間隔のデフォルト値は 30 秒です。データの書き込みスループットが低い場合 (2MB/秒未満の場合) は、flume-site.xml ファイルの flume.collector.roll.millis プロパティと flume.agent.logdir.retransmit プロパティを変更して、デフォルト値を増やすとよいでしょう。

3.6. まとめ

ここでは、マスターとノードを起動する方法、マスターを介してノードを構成する方法について説明しました。また、多くの構成をまとめて指定する方法、エージェントとコレクタ、さらに、3 層トポロジに基づいて単一のマシン上でエージェントからコレクタへのパイプラインを構築する方法についても説明しました。

4. 完全分散モード

Flume の第一の目標は、数多くのホストからログとデータを収集すること、さまざまなトポロジのクラスタとネットワークに対してスケーラビリティを発揮して、これらをインテリジェントに扱うことです。

Flume をクラスタにデプロイするには、次の手順を実行します。

Flume をクラスタにデプロイする手順

  • 各マシンに Flume をインストールします。
  • マスターにするノードを 1 つ以上選びます。
  • サイト固有のプロパティを使用するための静的構成ファイルを適切に編集します。
  • 少なくとも 1 台 のマシン上で Flume マスターノードを起動します。
  • マシンで Flume ノードを起動します。

以下では、各ノードのマスターを指定するプロパティファイルを手動で構成する方法、およびパラメータのデフォルト値を設定する方法について説明します。その後、大規模システム向けのデータフロー構成、コレクタを追加して処理能力を向上する方法、マスターを複数追加してマスターの信頼性を高める方法について説明します。

4.1. 静的構成ファイル

これまでは、1 台のマシン上でデフォルト構成の設定を使って Flume を実行していました。デフォルト設定の場合、ノードは自動的に localhost 上の標準ポートでマスターを探します。完全分散モードで Flume ノードがマスターを見つけられるようにするには、サイト固有の静的構成設定を指定しなければなりません。

Flume ノードとマスターを対象とするサイト固有の設定は、各マシンの conf/flume-site.xml ファイルのプロパティで構成します。conf/flume-site.xml ファイルが存在しない場合、Flume のコマンド群は、 conf/flume-conf.xml で指定された設定をデフォルトとして使用します。次に示すのは、Flume ノードに対し、 master という名前のマシンにマスターを探しに行くよう指定するプロパティの設定例です。

conf/flume-site.xml

<?xml version="1.0"?>
<?xml-stylesheet type="text/xsl"  href="configuration.xsl"?>

<configuration>
<property>
<name>flume.master.servers</name>
<value>master</value>
</property>
</configuration>

4.1.1. デフォルト値の使用

エージェントとコレクタという役割分担を利用する場合は、 flume-site.xml ファイルに次のような構成プロパティを追加すると、コレクタとして使われるデフォルトのホストをセットアップすることができます。

...
<property>
<name>flume.collector.event.host</name>
<value>collector</value>
<description>This is the host name of the default "remote" collector.
</description>
</property>
  <property>
<name>flume.collector.port</name>
<value>35853</value>
<description>This default tcp port that the collector listens to in order to receive events it is collecting.
</description>
</property>

...

上のように設定すると、引数なしに使われた agentSink では、 flume.collector.event.hostflume.collector.port がデフォルトのターゲットとポートとして使われるようになります。

次に示すのは、複数のエージェントが 1 つのコレクタにデータをプッシュする例です。7 つの Flume ノードがあり、うち 6 つはエージェント層に、1 つはコレクタ層に配置されています。

singleCollector.png

次に示すのは、パラメータをすべて指定した構成です。

agentA : src | agentSink("collector",35853);
agentB : src | agentSink("collector",35853);
agentC : src | agentSink("collector",35853);
agentD : src | agentSink("collector",35853);
agentE : src | agentSink("collector",35853);
agentF : src | agentSink("collector",35853);
collector : collectorSource(35853) | collectorSink("hdfs://namenode/flume/","srcdata");
[注記] 注記

agentSink の送信先を指定するときは、 ターゲットマシンのホスト名とポート を使用してください。ノードのデフォルト名は、そのノードのホスト名です。ただし、論理ノードが複数ある場合は、 論理ノードの名前 ではなく、マシンのホスト名を使用しなければなりません。上の例で、agent[A-F] と collector は、これらの構成が実行されている該当マシンの 物理ホスト名 です。

構成ファイルで設定されているデフォルトポートを使用することを前提とすれば、上の構成は次のように記述できます。

agentA : src | agentSink("collector");
agentB : src | agentSink("collector");
agentC : src | agentSink("collector");
agentD : src | agentSink("collector");
agentE : src | agentSink("collector");
agentF : src | agentSink("collector");
collector : collectorSource | collectorSink("hdfs://namenode/flume/","srcdata");

デフォルトポートとデフォルトのコレクタホストを使用することを前提とすれば、上の構成はさらに次のように記述できます。

agentA : src | agentSink
agentB : src | agentSink
agentC : src | agentSink
agentD : src | agentSink
agentE : src | agentSink
agentF : src | agentSink
collector : collectorSource | collectorSink("hdfs://namenode/flume/","srcdata");
[警告] 警告

デフォルト値を利用すれば、データフローをより簡潔に記述することができますが、ノード間のつながりの細かな点についてはわかりにくくなる可能性があります。

4.2. 複数のコレクタ

複数のコレクタを用意すると、ログ収集のスループットを向上させることができ、コレクタの可用性を高めてイベント配信の適時性を改善することができます。データの収集は並列化が可能です。したがって、多数のエージェントからの負荷も、複数のコレクタ間で分担することができます。

4.2.1. エージェントを複数のコレクタに分離する構成

上で最初に示したデータフローの図は、Flume ノードの典型的なトポロジを表したものです。この図の場合、信頼性の高い配信を行うためには、コレクタが処理を停止したり、エージェントとの接続を切断したりしたときに、エージェントがそれぞれのディスクにイベントをローカル保存する必要があります。その後、エージェントは一定時間ごとにコレクタに再コンタクトを試みますが、コレクタがダウンしているので、分析や下流での処理はブロックされます。

multiCollector.png

上のデータフローの図のように、コレクタを複数用意すれば、1 つのコレクタで障害が発生しても、下流で処理を行うことができます。たとえば、コレクタ B がダウンした場合、エージェント A とエージェント B、エージェント E とエージェント F は、それぞれコレクタ A とコレクタ C を介してイベントの配信を続行できます。一方、エージェント C とエージェント D は、コレクタ B (またはその代替ノード) がオンライン状態に復帰するまで、それぞれのログをキューに入れる必要があります。

次に示すのは、一連のエージェントからの作業を複数のコレクタに分離する構成です。この例では、各コレクタで同一の DFS 出力ディレクトリとファイルプリフィックスを指定しており、ログのすべてを同じ 1 つのディレクトリに集約します。

agentA : src | agentE2ESink("collectorA",35853);
agentB : src | agentE2ESink("collectorA",35853);
agentC : src | agentE2ESink("collectorB",35853);
agentD : src | agentE2ESink("collectorB",35853);
agentE : src | agentE2ESink("collectorC",35853);
agentF : src | agentE2ESink("collectorC",35853);
collectorA : collectorSource(35853) | collectorSink("hdfs://...","src");
collectorB : collectorSource(35853) | collectorSink("hdfs://...","src");
collectorC : collectorSource(35853) | collectorSink("hdfs://...","src");

4.2.2. フェイルオーバーチェインの手動指定

failoverCollector.png

同一のストレージに書き込むコレクタが複数ある場合、エージェント C とエージェント D にいつまでもログをキューに入れさせるのではなく、これらのエージェントをほかのコレクタにフェイルオーバーさせることができます。たとえば、エージェント C とエージェント D をそれぞれコレクタ A とコレクタ C にフェイルオーバーさせる一方、コレクタ B がオンラインに復帰しているかどうかを一定時間ごとにチェックするといったことができます。

このような設定を指定するには、 フェイルオーバーチェイン 付きエージェントを使用します。単一のコレクタエージェントの場合と同様、フェイルオーバーチェインエージェントにも agentE2EChainagentDFOChain 、および agentBEChain の 3 つの信頼性レベルがあります。

次に示すのは、エンドツーエンドの信頼性を持つエージェント agentE2EChain を複数のコレクタを対象に使って、フェイルオーバーチェインを手動で指定する例です。この例では、 agentA は最初にポート 35853collectorA への送信を試みます。agentA のシンクの 2 番目の引数では、最初のコレクタで障害が発生した場合にフォールバックするコレクタを指定しています。コレクタは任意の数を指定できますが、少なくとも 1 つは指定しなければなりません。

agentA : src | agentE2EChain("collectorA:35853","collectorB:35853");
agentB : src | agentE2EChain("collectorA:35853","collectorC:35853");
agentC : src | agentE2EChain("collectorB:35853","collectorA:35853");
agentD : src | agentE2EChain("collectorB:35853","collectorC:35853");
agentE : src | agentE2EChain("collectorC:35853","collectorA:35853");
agentF : src | agentE2EChain("collectorC:35853","collectorB:35853");
collectorA : collectorSource(35853) | collectorSink("hdfs://...","src");
collectorB : collectorSource(35853) | collectorSink("hdfs://...","src");
collectorC : collectorSource(35853) | collectorSink("hdfs://...","src");
[注記] 注記

ここでは、 agent[A-F] および collector[A-B] はいずれも物理ホスト名です。

コレクタを 1 つだけ指定する場合と同様、ポート番号が指定されていなければ、エージェントはデフォルトで flume.collector.port を使用します。

agentA : src | agentE2EChain("collectorA","collectorB");
agentB : src | agentE2EChain("collectorA","collectorC");
agentC : src | agentE2EChain("collectorB","collectorA");
agentD : src | agentE2EChain("collectorB","collectorC");
agentE : src | agentE2EChain("collectorC","collectorA");
agentF : src | agentE2EChain("collectorC","collectorB"); collectorA : collectorSource | collectorSink("hdfs://...","src");
collectorB : collectorSource | collectorSink("hdfs://...","src");
collectorC : collectorSource | collectorSink("hdfs://...","src");

4.2.3. 自動フェイルオーバーチェイン

[警告] 警告

複数のマスターを使用する場合、現時点では自動フェイルオーバーチェイン機能は動作しません。

Flume では、ノードの構成に基づいてフェイルオーバーチェインを自動的に割り当てるメカニズムが用意されています。コレクタノードは Flume マスターで割り当てられるので、マスターはエージェントをコレクタ間で均等に割り当てようとします。具体的には、障害発生時、各エージェントにはそれぞれ異なるフェイルオーバーチェインが割り当てられます。このしくみによって、あるコレクタで障害が発生した場合に、別のコレクタの負荷が過剰になるのを防いでいます。

ノードがフェイルオーバーチェインを使用するように指定するには、エージェントシンクに autoE2EChainautoDFOChain 、または autoBEChain を使用します。フェイルオーバーチェインはマスターが計算するので、これらのシンクは引数を取りません。

agentA : src | autoE2EChain ;
agentB : src | autoE2EChain ;
agentC : src | autoE2EChain ;
agentD : src | autoE2EChain ;
agentE : src | autoE2EChain ;
agentF : src | autoE2EChain ;
collectorA : autoCollectorSource | collectorSink("hdfs://...", "src");
collectorB : autoCollectorSource | collectorSink("hdfs://...", "src");
collectorC : autoCollectorSource | collectorSink("hdfs://...", "src");

マスターは、システムに現在存在するコレクタに基づいて、エージェントの構成を更新します。システムに新しくコレクタが追加されると、マスターはエージェントのフェイルオーバーチェインを更新して、バランスの再調整を行います。

[注記] 注記

autoCollectorSource をソースに持つノードが存在しない場合、エージェントの自動フェイルオーバーチェインは、 fail("…") チェインを報告します。このチェインは、 autoCollectorSource が指定されるのを待っていることを示します。ノードがマップされていない場合、該当するノードは fail シンクを報告し、そのノードがマップされていないこと (そのノードがホスト/ポートに関連付けられていないこと) を知らせるようになっています。

[ティップ] ティップ

変換後の auto*Chain 構成は、変換された構成の列のノード構成で確認できます。これは、シンクの障害回復動作の宣言的仕様のようなものです。詳細については、このガイドの後の方で取り上げます。今後のリビジョンでは、エージェントやほかのチェインについても変換された後のものが表示されるようになる予定です。

4.3. 論理構成

Flume ノードの数が少ない場合は、ノードを手作業で構成することも可能ですが、ノードの数が増えてくると、オペレーターの負担も重くなります。理想は、オペレーターが特定のマシンに役割を割り当てるだけで済むようにすることです。Flume では、構成管理はマスターによって集中化されているので、ノードのトポロジを的確に作成して、入り組んだデータフローを互いに分離するのに必要なすべての情報は、マスターが保持することになります。

このようなしくみを実現するために導入されたのが、 論理ノード という概念です。そして、論理ノード間のやり取りを管理するために、 論理ソース論理シンク という概念が導入されました。また、ノードのグループ相互間を分離するために、 フロー という概念が導入され、エージェントとコレクタを別個の独立したグループにグループ化できるようにしました。

4.3.1. 論理ノード

論理ノードという抽象化によって、各 JVM インスタンス (物理ノード) は複数の論理ノードを含むことができるようになっています。このため、1 つの JVM インスタンス内で、数多くのソースとシンクの組み合わせを複数の実行スレッドで処理することができます。

各論理ノードは、その物理名、すなわちホスト名とまったく異なった名前を持つことができます。そのため、論理ノードの場合は、新しいノードを実行したり、論理ノードを物理ノードにマップしたり、既存の論理ノードを無効化 (decommission) したりする操作を新たに行う必要があります。

[注記] 注記

以下に示す一連のコマンドは、マスターの [raw command] ページの Web インタフェースを介して実行する場合のコマンド入力例です。これらの操作は、Flume コマンドシェル (このガイドの後の方で説明しています) を使って実行することもできます。コマンドシェルで実行する場合は、Web インタフェースで入力するコマンドの前に exec を付けて入力してください。

たとえば、エージェントとコレクタによるトポロジを採用することは決まっているものの、具体的なマシンの名前はまだわからないとします。その場合、次のようにして物理マシン名を指定せずに論理ノードの構成を指定することができます。

agent1 : _source_ | autoBEChain ;
collector1 : autoCollectorSource | collectorSink("hdfs://....") ;

その後、agent1 マシンの名前が host1 で、collector1 マシンの名前が host2 であることがわかったら、次のようにして map コマンドを実行することで、これらの論理ノードを host1 と host2 上の物理的な Flume インスタンスにマップすることができます。

map host1 agent1
map host2 collector1

この操作を行った後、[Node status] テーブルには、論理ノードごとに新しい行が表示されるはずです。各論理ノードの行には、その論理ノードの実行状態、構成、およびハートビートが表示されます。また、[Physical/Logical Node mapping] テーブルには新しいエントリが表示され、指定された物理ノード上に論理ノードが置かれたことがわかります。ノードのソースとシンクを構成するには、疑似分散モードのところですでに説明した方法とまったく同じ方法を使用できます。

decommission コマンドを使えば、論理ノードを削除することもできます。たとえば、agent1 が不要になり、agent1 を「無効化」したいとします。その場合は、次のコマンドを入力します。

decommission agent1

コマンドを入力すると、スレッドが終了し、論理ノードに関連付けられていた構成、および論理ノードと物理ノードとのマッピングが削除されます。

ある物理ノードから別の物理ノードへ論理ノードを移動することもできます。それには、まず論理ノードのマッピングを削除し、次にその論理ノードを別の物理ノードにマップします。たとえば、collector1 を host2 から host3 に移動するには、まず次のコマンドを入力します。

unmap host2 collector1

これで、論理ノードのマッピングが削除され、collector1 がアクティブな状態はどこにも存在しないことになります。この状態で、map コマンドを使って、collector1 を host3 にマップします。

map host3 collector1
[注記] 注記

論理ノードはテンプレートではありません。したがって、特定の物理ノード上で同一のソース/シンクのペアを複数作成するには、それぞれに対して論理ノードを 1 つ作成する必要があります。多数の論理ノードを管理する場合は、スクリプトを書いて、各種の構成と一意の論理ノード名をスクリプトで生成するようにすると便利です。この場合、ホスト名の一部を論理ノード名に使用するのが一般的です。

4.3.2. 論理ソースと論理シンク

[警告] 警告

複数のマスターを使用する場合、現時点では論理ソースと論理シンクの機能は動作しません。

上の例では、2 つの抽象化を使って、 物理的なホスト名とポートを使用することなく 、図のトポロジを指定することができました。同様に、 論理ソース論理シンク を使えば、実際にマップするまで物理的なマシンについて知らなくても、さまざまなトポロジを作成することができます。

たとえば、データを生産するノードが 2 つあって、これらのノードが消費者にデータを送信するとします。

dataProducer1 : console | logicalSink("dataConsumer") ;
dataProducer2 : console | logicalSink("dataConsumer") ;
dataConsumer : logicalSource | console ;

この例の場合、送信先の引数はノードの 論理名 であって、具体的なホスト名とポートの組み合わせではないことに注意してください。

Flume では、こうした機能を実装するための汎用化されたメカニズムが用意されており、ユーザーが入力した論理構成は、マスターによって物理構成へと 変換 (translate) されます。

たとえば、ユーザーが次のようにして論理ノードを物理ノードにマップしたとします。

map host1 dataProducer1
map host2 dataProducer2
map host3 dataConsumer

この後、マスターがホスト名 (host1、host2、および host3 マシンからのマスターに対するハートビート) を確認すれば、マスターは論理構成を物理的なホスト名とポートに変換するのに十分な情報を入手できたことになります。次に示すのは、logicalSource を rpcSource で置き換え、logicalSink を rpcSink で置き換える変換の一例です。

dataProducer1 : console | rpcSink("host3",56789) ;
dataProducer2 : console | rpcSink("host3",56789) ;
dataConsumer : rpcSource(56789) | console ;

auto エージェントと auto コレクタも、実際には 変換されたソースとシンク の一例です。この場合、auto*Chain シンクと collectorSource は、logicalSink と logicalSource を使った構成に変換され、これがさらに物理的な rpcSource のインスタンスと rpcSink のインスタンスに変換されます。

[ティップ] ティップ

変換は強力で、かつ非常に合理的にできています。たとえば、コレクタが新しく追加されると、これらのコレクタは新たにフェイルオーバーノードになります。コレクタが削除されると、削除されたコレクタは自動的にほかのフェイルオーバーノードで置き換えられます。

4.3.3. フローの分離

[警告] 警告

複数のマスターを使用する場合、現時点では自動フロー分離機能は動作しません。

同一の物理ノードから種類の異なるデータを収集するにはどうすればよいでしょうか。たとえば、同じ物理マシンから httpd のログと syslog のログを収集したいとします。このとき、この物理マシンを含むクラスタで生成された syslog のデータすべてを 1 つのディレクトリツリーに書き込み、同じクラスタで生成された httpd のログは別のディレクトリツリーに書き込むには、どうすればよいでしょうか。

1 つの方法として、すべてのデータにソース情報のタグを付けて、すべて同じパイプにプッシュするやり方があります。この場合、何らかの後処理を行って、データを異なるバケットへと分離します。別の方法として、データの集合を論理的に 2 つに分離した状態をずっと保持し、後処理を行わないやり方もあります。

Flume ではどちらの方法も実行可能ですが、ノードを フロー にグループ化する概念を導入することで、より遅延の少ない後者の方法を実現しています。Flume では、論理ノードを利用することで単一の JVM 上に複数のノードを持つことができるので、生成されるデータの種類ごとにノードを用意することができます。

次に示すのは、こうしたフローの使用例です。システムに 6 つの論理ノードを用意しています。

fooNode1 : fooSrc | autoBEChain ;
barNode1 : barSrc | autoBEChain ;
forNode2 : fooSrc | autoBEChain ;
barNode2 : barSrc | autoBEChain ;
fooConsumer : autoCollectorSource | collectorSink("hdfs://nn/foodir") ;
barConsumer : autoCollectorSource | collectorSink("hdfs://nn/bardir") ;

この例では、foo データと bar データの 2 種類のデータを両方とも生成する物理マシンが 2 つあります。このとき、データを単一のコレクタに送り、このコレクタで foo データと bar データの両方を収集し、それぞれのデータを異なる HDFS ディレクトリに書き込みたいとします。考えられる方法として、次のように論理ノードを物理ノードにマップするやり方があります。

map host1 fooNode1
map host1 barNode1
map host2 fooNode2
map host2 barNode2
map host3 fooConsumer
map host3 barConsumer

図1 Flume フロー: 単一のフロー

single-flow.png

この構成は、上で説明した 2 つの方法のうち、最初の方法に相当する方法です。foo データと bar データは混ざり合います。なぜなら、autoBEChain を変換する際、マスターによって等価とみなされる collectorSource は 2 つ 存在するからです。foo データは barConsumer に送られる可能性があり、bar データも fooConsumer に送られる可能性があります。

本来望んでいたのは、情報のソースを論理的に分離されたデータストリームに分けることでした。Flume では、 フロー と呼ばれるグループ化による抽象化が用意されています。フローは、特定の論理ノードをグループ化して、論理的に異なる種類のデータを分離したままにしておくことができる機能です。

具体的には、フローによって、Flume クラスタ内でデータの種類ごとに異なるフェイルオーバーチェインを用意することができます。auto*Chain ベースのエージェントは、同一のフローグループに属するコレクタだけにデータを送信します。これでデータは分離され、該当するグループ内のノードにのみデータが流れることになります。

現在のところ、構成言語の短縮形でフローを指定することはできません。フローを指定するには、config コマンドに特別な引数を追加して実行する必要があります。

次に示すのは、フローグループの情報を指定せず Flume シェルで入力するコマンドの例です。この例の場合、すべてのノードが同一のフローに属します。

exec config fooNode1 fooSrc autoBEChain
exec config barNode1 barSrc autoBEChain
exec config fooNode2 fooSrc autoBEChain
exec config barNode2 barSrc autoBEChain
exec config fooConsumer autoCollectorSource 'collectorSink("hdfs://nn/foodir")'
exec config barConsumer autoCollectorSource 'collectorSink("hdfs://nn/bardir")'

一方、次に示すのは、ノード名の後にパラメータを追加して、フローを指定するコマンドの例です。この例の場合、flowfoo と flowbar の 2 つのフローがあります。flowfoo には、fooNode1、fooNode2、および fooConsumer が含まれます。flowbar には、barNode1、barNode2、および barConsumer が含まれます。

exec config fooNode1 flowfoo fooSrc autoBEChain
exec config barNode1 flowbar barSrc autoBEChain
exec config fooNode2 flowfoo fooSrc autoBEChain
exec config barNode2 flowbar barSrc autoBEChain
exec config fooConsumer flowfoo autoCollectorSource 'collectorSink("hdfs://nn/foodir")'
exec config barConsumer flowbar autoCollectorSource 'collectorSink("hdfs://nn/bardir")'

図2 Flume フロー: 複数のフロー

multi-flow.png

上のコマンドを実行すると、fooNode1 と fooNode2 からのデータは fooConsumer だけに送られ、barNode1 と barNode2 のデータは barConsumer だけに送られるようになります。この構成では、明示的に接続を指定しない限り、1 つのノードからのデータが、ほかのノードからの別の種類のデータと混ざり合うことはありません。

[ティップ] ティップ

実際の運用では、データの種類ごとに異なるノード名とフロー ID を使用するとよいでしょう。ノード名を再利用した場合、デフォルトでは、ノードの実行がクラッシュときに残ったデータや、以前のソース/シンクの構成バージョンのときの残りのデータも、同じ種類のデータとみなして、障害からの復旧を試みます。

4.3.4. まとめ

ここでは、論理ノード、論理ソース、論理シンク、およびフローについて説明し、これらの抽象化によって、管理作業に関連する次のような問題も適切に解決できることを示しました。

  • 物理ノードごとに用意できる入力ソースは 1 つだけ。
  • 複数の分離されたフローの扱い。
  • 構成をマシン固有にすると、すべての物理ホスト名とポートを知る必要がある。

変換のメカニズムは非常に強力な機能です。この変換機能をメトリクス情報と組み合わせれば、動的な構成変更を自動で行うことも可能になります。たとえば、日単位でのトラフィックや負荷のパターンに合わせて、自動的に新しいコレクタを有効または無効にするといった利用法が考えられます。

4.4. 複数のマスター

[警告] 警告

複数のマスターを使用する場合、現時点では自動フェイルオーバーチェイン機能、自動フロー分離機能、および論理ソースと論理シンクの機能は動作しません。

マスターが実行する仕事は 2 つあります。ひとつは Flume デプロイのすべてのノードを追跡し、構成に変更があったら、これらのノードに構成の変更を通知することです。もうひとつは、 信頼性の高いモード で実行されている Flume フローの終端からの受信確認を追跡し、該当するフローの始点にあるソースが、いつイベントの転送を中止すればよいか、わかるようにすることです。

どちらの仕事も、Flume デプロイの動作には不可欠です。このため、マスターを 1 台のマシンで実行するのはお勧めできません。Flume サービス全体にとって単一障害点 (SPOF) になってしまうからです (詳細については、 障害モード を参照してください[訳注:該当する箇所はありません])。

Flume では、複数のマスターを物理的に独立したノード上で実行し、これらのマスターが互いに協調して、同期化された状態を維持する実行モードをサポートしています。マスターの 1 つに障害が発生した場合、残りのマスターが作業を引き継いで、実行中のすべてのフローの機能を維持することができます。これらの処理はすべて透過的に行われ、構成時に若干の作業が必要になるだけです。ノードの側では、現在のマスターとのコンタクトが失われると、正常に実行されているマスターに自動的にフェイルオーバーします。

4.4.1. スタンドアロンモードと分散モード

Flume のマスターは、次の 2 つのモードのいずれかで動作します。

  • スタンドアロン モード - マスターは 1 台のマシンで実行されます。管理は簡単でセットアップも容易ですが、スケーラビリティと耐障害性の面で難点があります。
  • 分散 モード - マスターは複数のマシン (通常 3 台または 5 台のマシン) で実行されます。このモードでは、多数のフローに対応可能なスケーラビリティを確保でき、耐障害性も良好です。

大規模な実働環境の Flume デプロイでは、分散モードでマスターを実行して、マシンで障害が発生しても (これは必ず起きます)、Flume の可用性に影響が及ばないようにする必要があります。小規模なデプロイでは、分散モードとスタンドアロンモードのどちらがよいかは単純には決められません。分散モードでマスターを実行すれば計算資源もそれだけ多く必要になりますが、これらの資源をノードやほかのサービスに振り向けた方がよい場合もあるからです。また障害が発生しても、ユーザーが介入することで十分タイムリーな復旧が可能になることもあります。分散モードとスタンドアロンモードのどちらでマスターを実行するかは、究極的にはそれぞれのケースと運用上の要件に基づいて決定するのがよいでしょう。

4.4.2. スタンドアロンモードでの実行

Flume マスターが分散モードで起動するかスタンドアロンモードで起動するかは、間接的には、マスターサーバーとして実行するよう構成されているマシンが何台あるかによって決まります。スタンドアロンモードでマスターを実行するには、構成プロパティを 1 つ、具体的には flume.master.servers を設定しなければなりません。

<property>
<name>flume.master.servers</name>
<value>hostA</value>
</property>

flume.master.servers の値には、マスターサーバーになるすべてのマシン名 (IP アドレス) のリストをカンマで区切って指定します。リストに含まれているマシン名が 1 つだけの場合、Flume マスターはスタンドアロンモードで起動します。リストに含まれているマシン名が複数ある場合、Flume マスターは分散モードで起動します。

スタンドアロンモードでは、ほかに必要な構成はありません。マスターに関連するその他のすべての変数については、Flume が適切なデフォルト値を使用します。マスターを起動するには、コマンドプロンプトで次のように入力します。

$ flume master

このコマンドは $FLUME_HOME から実行します。コマンドを実行すると、たくさんのログメッセージが画面に表示されます。マスターが起動した後、適切に動作しているかどうかを確認するには、 http://master-node-ip:35871/ の Web インタフェースにアクセスします。ここで、 master-node-ip はマスターノードの IP アドレス (またはホスト名) です。Web ページが表示されれば、マスターは実行されています。

4.4.3. 分散モードでの実行

Flume マスターを分散モードで実行すると、スタンドアロンモードのときよりも耐障害性が高まり、数百のノードにも対応できるスケーラビリティを発揮できます。

分散モードの Flume マスターの一部として実行するようマシンを構成する作業は、スタンドアロンモードの場合とほぼ同じで簡単です。flume.master.servers を設定する必要があるのはスタンドアロンモードのときと同じですが、今度はマシンのリストを指定します。

<property>
<name>flume.master.servers</name>
<value>masterA,masterB,masterC</value>
</property>

設定する必要のあるもうひとつのプロパティは、すべてのマシンに 共通ではありません 。Flume マスターのすべてのノードで、 flume.master.serverid に一意の値を指定しなければなりません。

masterA. 

<property>
<name>flume.master.serverid</name>
<value>0</value>
</property>

masterB. 

<property>
<name>flume.master.serverid</name>
<value>1</value>
</property>

masterC. 

<property>
<name>flume.master.serverid</name>
<value>2</value>
</property>

各ノードの flume.master.serverid は、 flume.master.servers リスト内でのそのノードのホスト名の添字 (先頭は 0) です。たとえば、 masterB の添字は 1 になります。このプロパティは、Flume マスターの各ノードが、ほかのノードに対して自分を一意に識別できるようにするために使われます。

3 ノードの分散モード Flume マスターを起動するのに必要な構成はこれだけです。この構成を実際に試してみるには、3 台のマシンすべてでマスタープロセスを起動します。

[flume@masterA] flume master

[flume@masterB] flume master

[flume@masterC] flume master

各マスタープロセスは、起動すると、アンサンブルのほかのすべてのノードに対し、コンタクトを試みます。アンサンブルの過半数 (現在の例では 2 つ) のノードがアップしてコンタクト可能になるまでは、構成ストアが起動できず、Flume マスターは構成データを読み書きすることができません。

アンサンブルの現在の状態は、任意の Flume マスターマシンの Web ページで確認することができます。Web ページのデフォルトは、たとえば masterA の場合、http://masterA:35871 です。

4.4.4. 構成ストア

Flume マスターは、そのすべてのデータを 構成ストア に保存します。Flume はプラガブルな構成ストアアーキテクチャを採用しており、次の 2 つの実装をサポートしています。

  • Memory-Backed Config Store (MBCS)。構成をメモリに一時的に保存します。マスターノードで障害が発生して再起動した場合、構成データはすべて失われます。MBCS は、分散モードのマスターでは使用できません。ただし、MBCS は管理が容易で、計算資源をあまり消費しないので、テストや実験に使うと便利です。
  • ZooKeeper-Backed Config Store (ZBCS)。構成を永続的に保存し、複数のマスター間で構成を同期化します。

4.4.5. どちらの構成ストアを使用するべきか

ほとんどのケースでは ZBCS を使用するべきです。ZBCS の方が信頼性と耐障害性に優れており、再起動後も構成は復旧されます。ZBCS は、スタンドアロンモードと分散モードのどちらの Flume マスターでも使用できます。

MBCS は、Flume を試用するケースで、マシンの障害発生時に構成が失われても問題ない場合に適してます。

ZBCS はデフォルトの構成ストアです。どちらの構成ストアを使用するかは、 flume.master.store システムプロパティで決まります。

<property>
<name>flume.master.store</name>
<value>zookeeper</value>
</property>

値に memory が設定された場合、Flume マスターは MBCS を使用します。MBCS はスタンドアロンモードでのみサポートされています。

4.4.6. ZBCS の構成方法

ZBCS を使用するほとんどのデプロイでは、Flume のデフォルト構成を使用できます。ただし、Flume マスターの構成をより細かくコントロールする必要がある場合は、そのためにいくつかのプロパティを使用できます。

ログディレクトリ - flume.master.zk.logdir
ZBCS では、信頼性を確保し、障害発生時に自身の状態を回復できるようにするために、 flume.master.zk.logdir で指定されたディレクトリにすべての更新を継続的に記録します。このディレクトリは、Flume を実行するユーザーから書き込み可能になっていなければならず、起動時にこのディレクトリが存在しなければ作成されます。警告: このディレクトリとこのディレクトリにあるファイルは削除しないでください。ディレクトリやその中のファイルが削除された場合、構成情報はすべて失われます。
ZBCS サーバーポート
分散モード Flume マスターの各マシンは、 flume.master.zk.server.quorum.portflume.master.zk.server.election.port で指定された TCP ポートを使ってほかのすべてのマシンと通信します。デフォルトのポートはそれぞれ 3182 と 3183 です。これらの設定によって、ZBCS がリスンするポートと、アンサンブルのほかのマシンを探すのに使われるポートの両方が決まることに注意してください。
ZBCS クライアントポート - flume.master.zk.client.port
Flume マスタープロセスは、クライアント TCP ポート経由で (同一マシン上、またはほかのマスターサーバー上の) ZooKeeper と通信します。クライアント TCP ポートは、 flume.master.zk.client.port で設定します。デフォルトは 3181 です。

4.4.7. 分散モードでの gossip

Flume マスターサーバーは、相互間で gossip プロトコルも使って情報をやり取りします。各サーバーは一定時間ごとに起動し、新しいデータの送信先となるほかのサーバーを 1 つ選択します。このプロトコルはデフォルトで TCP ポート 57890 を使用しますが、使用するポートは flume.master.gossipport プロパティで指定できます。

<property>
<name>flume.master.gossip.port</name>
<value>57890</value>
</property>

スタンドアロンモードでは、gossip プロトコルを使う必要がないため、このポートは使われません。

4.4.8. 図解: マスターとノードの対話

図3 Flume マスター: スタンドアロンモード

master-zk-standalone.png

図4 Flume マスター: 分散モード

master-zk-internal.png

4.4.9. 複数のマスターサーバーに接続するための Flume ノードの構成

複数のマスターに接続するよう Flume ノードを構成するのに必要なことは、 flume.master.servers プロパティを設定することだけです。

<property>
<name>flume.master.servers</name>
<value>masterA,masterB,masterC</value>
</property>

ノードは、Flume マスターの各マシンのポート flume.master.heartbeat.port を介して接続します。マスターはこのポートでノードのハートビートをリスンします。

1 つのマスターサーバーに障害が発生した場合、ノードは、接続を確立可能な次のサーバーを無作為に選択し、このサーバーに自動的にフェイルオーバーします。

4.5. 外部の ZooKeeper クラスタ

場合によっては、外部で管理されている ZooKeeper サービスを ZBCS で利用したいことがあります。よくある例として、ZooKeeper に依存する複数のサービスを利用している場合 (Flume と HBase など) を挙げることができます。次に示すのは、このような場合の構成例です。zkServer{A,B,C}:2181 は、アンサンブルを実際に構成する ZooKeeper サーバーのホスト名/ポートで置き換えてください。

conf/flume-site.xml

<property>
  <name>flume.master.zk.use.external</name>
  <value>true</value>
</property>

<property>
  <name>flume.master.zk.servers</name>
  <value>zkServerA:2181,zkServerB:2181,zkServerC:2181</value>
</property>

4.6. まとめ

ここでは、完全分散モードでの Flume ノードのインストール、デプロイ、および構成について説明しました。これまでの説明で、Flume を使ってログのストリームを収集できるようになったはずです。

また、ノードを互いにつなげるためのソースとシンクの役割についても説明しました。一連の Flume ノードをセットアップするための操作の基本は理解できたはずです。次に示すのは、新しく登場したソースとシンクです。

Flume の階層化イベントソース

collectorSource[(port)]
コレクタソース。ポート port に転送されてくる agentSink からのデータをリスンします。ポートが指定されなかった場合、ノードは、デフォルトの TCP ポートである 35853 を使用します。

5. Flume とデータソースの統合

Flume のソースインタフェースは、シンプルながら強力な機能を持ち、構造化されていないバイト blob から、構造化メタデータの付いた半構造化 blob、さらに完全に構造化されたデータまで、あらゆる種類のデータのロギングが可能なように設計されています。

ここでは、データを取り込むのに使用できるいくつかの基本的なメカニズムについて説明します。一般的に言って、データを取り込むには 3 つの方法があります。すなわち、Flume にデータを プッシュ するか、Flume にデータを ポーリング させるか、または Flume ないし Flume のコンポーネントをアプリケーションに 埋め込む かのいずれかです。

これら 3 つのメカニズムには、それぞれの操作のセマンティクスに起因するトレードオフがあります。

また、ソースによっては、 ワンショット ソースまたは 連続 ソースになる場合があります。

5.1. プッシュソース

syslogTcp, syslogUdp
syslog および syslog-ng ロギングプロトコルとデータフォーマットに互換性があります。
scribe
scribe ログ収集システムとデータフォーマットに互換性があります。

5.2. ポーリングソース

tail, multitail
追記がないかどうかファイルを監視します。
exec
既存のプログラムを使って独自のデータを抽出する場合に便利です。
poller
Flume ノードそれ自体から情報を収集することができます。

5.3. 埋め込みソース

[警告] 警告

以下の機能は未完成です。

log4j

シンプルクライアントライブラリ

// move this to gathering data from sources

5.4. log4j を介した直接のロギング

Flume には Apache Log4j の統合サポートが組み込まれており、エンドユーザーはコードを変更することなく Flume エージェントに直接ログを書き込むことができます。このサポートは log4j アペンダの形で実装されており、ビルトインアペンダの場合と同様に、アプリケーションの log4j.properties または log4j.xml ファイルで構成することができます。アペンダは Flume の avroSource() を使用し、各 log4j LoggingEvent を、Flume がネイティブに扱うことのできる Flume Avro イベントに変換します。

Flume にログを書き込むよう log4j を構成するには

  1. 必要な jar ファイルがアプリケーションのクラスパス上に存在することを確認します。
  2. log4j 構成ファイルで、 com.cloudera.flume.log4j.appender.FlumeLog4jAvroAppender アペンダを構成します。

Flume Avro アペンダを使用するには、アプリケーションのクラスパス上に次の jar ファイルが存在しなければなりません。

  • flume-log4j-appender-version.jar
  • flume-core-version.jar

Avro の jar ファイルおよびその依存関係も必要です。

必要なすべての依存関係が確実にアプリケーションのクラスパスに含まれるようにする最も簡単な方法は、ユーザーに代わって依存関係を処理してくれる Maven などのビルドシステムを利用することです。Flume の log4j アペンダは Maven プロジェクトとして用意されており、Avro の依存関係も適切に取り込まれるようになっています。

Flume Avro アペンダには、動作を変更するためにユーザーが指定できるオプションがいくつかあります。このうち必ず設定しなければならないパラメータは、Flume avroSource がリスンするポートだけです。Flume Avro アペンダは、Flume エージェントがローカルに実行されていて、ホスト名 localhost で通信できることを前提に動作します。ロギングの呼び出しが失敗したとみなすまでの再接続試行回数も指定できます。

パラメータ

hostname
イベントの送信先となるホスト名または IP です。(デフォルト: localhost)
port
Flume の avroSource リスンするよう構成されているポート。(必須)
reconnectAttempts
例外をスローする前に、avroSource() に接続を試みる最大の回数。値に 0 を指定すると、無限に接続を試みます。(デフォルト: 10)

log4j.properties のサンプル. 

log4j.debug = true
log4j.rootLogger = INFO, flume

log4j.appender.flume = com.cloudera.flume.log4j.appender.FlumeLog4jAvroAppender
log4j.appender.flume.layout = org.apache.log4j.TTCCLayout
log4j.appender.flume.port = 12345
log4j.appender.flume.hostname = localhost
log4j.appender.flume.reconnectAttempts = 10

Flume 構成のサンプル. 

my-app : avroSource(12345) | agentE2ESink("my-app-col", 12346)
my-app-col : collectorSource(12346) | collectorSink("hdfs://...", "my-app-")

log4j.properties のサンプルで指定されているポートと、Flume 構成のサンプルで avroSource() の使用しているポートが同一であることに注意してください。

注記. FlumeLog4jAvroAppender は内部でバッファリングを行いません。その理由は、バッファリングを行うと、Flume ノードがエンドツーエンドの永続性を持つものとして構成されている場合でも、障害が発生した時点でアペンダの内部バッファに入っているイベントが失われる可能性があるからです。

reconnectAttempts パラメータの値に 0 (無限に再試行する) を指定すると、Flume エージェントが利用できなくなったときに、エンドユーザーアプリケーションが確実にブロックされるようにすることができます。この設定は、ログの記録ができなくなるよりサービスを停止した方が望ましい場合、すなわちデータ損失をゼロに抑えなければならない要件を持つユーザーのために用意されています。

5.4.1. Hadoop ジョブのロギング例

5.4.2. Hadoop デーモンを対象としたロギング

6. Flume によって収集されたデータの使用

Flume の第一の目標は、データを収集し、このデータを確実に HDFS に書き込むことです。データが到着するようになれば、次に問題になるのは、データをどこに、どのような形式で保存するかということです。Flume では、プロパティの構成とデータフロー言語によって基本的な出力管理を行うしくみが用意されています。このしくみによって、ユーザーは出力形式や受信データの出力のバケット化をコントロールすることができ、また、Hive や HBase といったほかの HDFS データ消費者との統合も容易になっています。

次に示すのは、ユースケースの例です。

  • Web サーバーを監視するときに、時刻、ページヒット、および使用ブラウザに基づいてログをバケット化したい。
  • 特定のデータノードを追跡するときに、時刻とデータノード名に基づいてログをバケット化したい。
  • Apache フィードからの JIRA チケットのフィードを追跡するときに、プロジェクト ID または特定の人物に基づくグループ化を行いたい。
  • scribe ソースからデータを収集するときに、イベントのカテゴリ情報に基づいて scribe のバケットデータを使いたい。

こうした機能をサポートするため、Flume ではシンプルなデータモデルを使用し、イベントをバケット化するためのしくみを用意するとともに、独自のバケット化判別子を指定して基本的な抽出操作を行えるようにしています。

6.1. Flume イベントのデータモデル

Flume イベントには、次の 6 つの主要フィールドがあります。

  • Unix タイムスタンプ
  • ナノ秒タイムスタンプ
  • プライオリティ
  • ソースホスト
  • 本体
  • 任意の数の属性値ペアを持ったメタデータテーブル

どのイベントも、これらの要素すべてを持つことが保証されています。ただし、本体は長さが 0 のことがあり、メタデータテーブルは空の場合があります。

Unix タイムスタンプはミリ秒単位で測定され、ソースマシン起源の Unix タイムスタンプです。ナノ秒タイムスタンプはマシン固有のナノ秒カウンタで、このタイムスタンプもソースマシン起源です。1 つのマシンを起源とするナノ時間は単調増加することを前提に解釈するのが安全です。すなわち、イベント A のナノ時間が、同じマシンからのイベント B のナノ時間よりも大きければ、イベント A はイベント B よりも前に受け取られたものです[訳注:おそらく原文の誤りで、A と B の発生順序は逆と思われます]。

メッセージのプライオリティが現在とりうる値は、TRACE、DEBUG、INFO、WARN、ERROR、FATAL の 6 つのうちのいずれかです。通常、これらの値は syslog や log4j といったロギングシステムにも用意されています。

ソースホストは、マシンの名前または IP です (hostname の呼び出しが返す任意のもの)。

本体 (body) は、raw ログエントリ本体です。デフォルトでは、イベントごとに最大 32 KB で本体を切り詰めます。これは構成可能な値で、 flume.event.max.size.bytes プロパティを修正すれば変更できます。

最後のメタデータテーブルは、文字列属性名から任意のバイト配列へのマップです。メタデータテーブルを使えば独自のバケット属性を用意することができます。具体的な使い方については、このガイドの「Flume の高度な利用」のところで詳しく説明します。

6.2. 出力のバケット化

イベントのフィールドの値に基づいて、イベントの出力先を特定のディレクトリやファイルにすることができます。このしくみを利用するには、エスケープシーケンスを使ってデータを特定のパスに出力します。

次に示すのは、出力指定の例です。

collectorSink("hdfs://namenode/flume/webdata/%H00/", "%{host}-")

最初の引数は、データを書き込むディレクトリです。2 番目の引数は、イベントを書き込むファイルのファイル名プリフィックスです。たとえば、server1 という名前のマシンから時刻 18:58 に生成されたイベントを受け取るとします。この場合、イベントは、namenode という名前の HDFS NameNode の /flume/webdata/1800/ ディレクトリに、server1-xxx というファイル名を使って書き込まれます。ここで、xxx はファイル名を一意に識別するための追加データです。

出力指定の中の %H は、イベントのデータで見つかったタイムスタンプの時刻を表す文字列で置き換えられます。同様に、%{host} は、イベント のホスト名フィールドの値で置き換えられます。

では、server1 のメッセージが遅延し、19:05 になるまでメッセージが下流に送られなかった場合はどうなるのでしょうか。この場合、イベントのタイムスタンプの値は 18 時台なので、イベントは 1800 のディレクトリに書き込まれます。

表3 イベントデータのエスケープシーケンス

%{host}

ホスト

%{nanos}

ナノ秒

%{priority}

プライオリティ文字列

%{body}

本体

%%

% 文字

%t

Unix 時間 (ミリ秒単位)


日付と時刻によるバケット化は、要求の多い機能なので、日付と時刻に基づくバケット化を細かく制御できるよう、さまざまなエスケープシーケンスが用意されています。

次に示すのは、こうしたエスケープシーケンスを使用した出力指定の例です。

collectorSink("hdfs://namenode/flume/webdata/%Y-%m-%d/%H00/", "web-")

この指定では、日ごとにディレクトリが作成され、その中にさらに各時間のサブディレクトリが作成され、ファイル名にはプリフィックス "web-" が付けられます。

表4 日付と時刻をきめ細かく指定するためのエスケープシーケンス

%a

ロケールの省略形の曜日名 (Mon, Tue, …)

%A

ロケールの完全表記の曜日名 (Monday, Tuesday, …)

%b

ロケールの省略形の月名 (Jan, Feb,…)

%B

ロケールの完全表記の月名 (January, February,…)

%c

ロケールの日付と時刻 (Thu Mar 3 23:05:25 2005)

%d

月内通算日数 (01)

%D

日付。%m/%d/%y と同じ

%H

時 (00..23)

%I

時 (01..12)

%j

年内通算日数 (001..366)

%k

時 ( 0..23)

%l

時 ( 1..12)

%m

月 (01..12)

%M

分 (00..59)

%P

am または pm のロケール

%s

1970-01-01 00:00:00 UTC からの秒数

%S

秒 (00..60)

%y

年の下 2 桁の数字 (00..99)

%Y

年 (2010)

%z

+hhmm 形式の数値によるタイムゾーン (たとえば -0400)


6.3. 出力形式

ファイルの出力先の指定方法は上に説明したとおりです。ここでは、データの出力形式の指定方法について説明します。2 つの方法があります。ひとつは flume-site.xml でデフォルト値を設定する方法で、もうひとつは特定のシンクで出力形式を指定する方法です。

6.3.1. デフォルト出力形式の構成

flume-site.xml ファイルで flume.collector.output.format プロパティを設定すると、デフォルトの出力形式を指定できます。指定できる出力形式には、次のものがあります。

表5 出力形式

avro

Avro ネイティブファイル形式。現在のデフォルトは非圧縮。

avrodata

Avro バイナリ形式のバイナリ符号化データ。

avrojson

Avro によって生成される JSON エンコードデータ。

default

デバッグ用の形式。

json

JSON エンコードデータ。

log4j

CDH 出力パターンで使われるものと似た log4j パターン。

raw

イベント本体のみ。これはファイルをコピーするのとほとんど同じですが、ホスト/タイムスタンプ/ナノ秒など、データを一意に識別するためのメタデータは保存されません。

syslog

syslog ライクなテキスト出力形式。

seqfile

WritableEventKeys キー、および値として WritableEvent を持つバイナリの Hadoop シーケンスファイル形式。


次に示すのは、 flume-site.xml ファイルにプロパティを追加する例です。

<property>
  <name>flume.collector.output.format</name>
  <value>avrojson</value>
  <description>This is the output format for the data written to the
  collector.  There are several formats available:
    avro - Avro Native file format.  Default currently is uncompressed.
    avrodata - this outputs data as an avro binary encoded data
    avrojson - this outputs data as json encoded by avro
    debug - this is a format for debugging
    json - this outputs data as json
    log4j - outputs events in a pattern similar to Hadoop's log4j pattern
    raw - Event body only.  This is most similar to copying a file but
      does not preserve any uniqifying metadata like host/timestamp/nanos.
    seqfile - this is the hadoop sequence file format with
       WritableEventKeys and WritableEvent objects.
    syslog - outputs events in a syslog-like format
  </description>
</property>

6.3.2. 特定のシンクでの出力形式の設定

シンクによっては、オプションで出力形式の引数を取るものがあります。該当するシンクは、 consoletextcustomdfs/formatDfsescapedCustomDfs/escapedFormatDfs 、および collectorSink です。

これらのシンクでは、省略可能な format 引数が用意されています。

collectorSink( "dfsdir","prefix"[, rollmillis[, format]])
text("file"[,format])
formatDfs("hdfs://nn/file" [, format])
escapedFormatDfs("hdfs://nn/file" [, format])

この方法では、出力形式に加えて引数を指定できるので、出力形式をより柔軟にコントロールすることができます。現在のところ、seqfile 出力形式は、シーケンスファイルの内部圧縮 codec を指定するための引数をサポートしています。利用できる codec は Hadoop で利用できるものと同じで、一般には gzipbzip2 が含まれており、これらに加えて lzosnappy などのプラグイン codec も利用できる場合があります。

圧縮 codec は、出力形式に引数を追加することで指定します。たとえば、 seqfile 出力形式は圧縮 codec 引数を受け取ります。したがって、bzip2 codec を使うシーケンスファイルを出力形式に指定する場合、 seqfile("bzip2") という形で指定します。また、formatDfs シンクを使って bzip2 圧縮シーケンスファイルを書き込むには、 formatDfs("hdfs://nn/dir/file", seqfile("bzip2")) と指定します。

[注記] 注記

Flume String を指定する必要のある古い構文 (例: console("avrojson"), console("seqfile")) は推奨されなくなっていますが、今後いくつかのバージョンまではサポートされます。現在推奨されている引数の指定方法は、"function" を使う方法です (例: console(avrojson), console(seqfile("bzip2")))。

6.4. 小さなファイルと高いレイテンシ

0.20.x より前のすべてのバージョンの Hadoop では、HDFS は write-once read-many というセマンティクスを採用しています。このため、HDFS ファイルを確実にフラッシュするには、ファイルをクローズする以外に方法がありません。しかも、いったんファイルをクローズしたら、そのファイルに新しいデータを追記することはできません。こうした事情から、データをすみやかに HDFS に書き出すか、または多数の小さなファイルが作成されるのを容認するか (これは HDFS のスケーラビリティ上のボトルネックになる可能性があります)、どちらを選ぶのかが問題になります。

一方で、システム全体の負荷とシステムが保持するデータを最小限に抑えるには、データが到着すると同時に HDFS にデータをフラッシュするのが理想です。しかし頻繁にフラッシュを行うことは、HDFS に効率的にデータを格納することとは相容れません。なぜなら、頻繁にフラッシュを行えば小さなファイルが数多く作成されることになり、最終的には HDFS NameNode にしわ寄せがいくからです。1 つの妥協策として、「妥当なサイズ」に達したファイル (理想は 1 つの HDFS ブロック、すなわちデフォルトでは 64 MB より大きいサイズのファイル) をコレクタがクローズするような適切なトリガを用意するやり方があります。

収集するデータ量が少ない Flume デプロイでは、理想的な最低ファイルサイズ (1 ブロックサイズ、一般に 64 MB) になるまでに、かなりの時間がかかる可能性があります。たとえば、1 つの Web サーバーが毎秒 10 KB のログ (目安として 1 つのログあたり 100 バイトで 100 ヒット/秒) を生成するとすると、理想的なファイルサイズになるのに約 2 時間 (6400 秒) かかることになります。

このような場合には、多数の小さなファイルが作成されるのを容認する方が望ましいでしょう。ただし、小さなファイルは下流でいくつか問題を引き起こします。たとえば、Hadoop の HDFS のスケーラビリティに制限が生じたり、Hadoop 内部で MapReduce のデフォルトの入力処理メカニズムを使用したときにパフォーマンス上のペナルティが発生したりすることがあります。

以下では、こうした潜在的問題を軽減する 2 つのしくみについて説明します。[訳注:この「説明」は未完成のようです。]

  • 多数の小さなファイルを大きなかたまりにまとめる
  • CombinedFileInputFormat の使用

上に指摘した問題は、ロギングの規模が大きくなれば、それほど大きな問題ではなくなります。上と同じ量のログを数百台のマシンが生成したと仮定すると、64 秒ごとに妥当なファイルサイズに達することになるからです。

Hadoop の将来のバージョンでは、現在オープンされている HDFS ファイルを対象とした flush/sync 操作を用意することで、この問題は軽減されます (すでに Hadoop HDFS 0.21.x 用のパッチの提供が予定されています)。

7. HDFS に書き込まれるファイルの圧縮

Flume は、HDFS に書き込まれるすべてのログファイルに対して、基本的な圧縮機能をサポートしています。圧縮されたファイルには自動的に拡張子が付けられ、通常のログファイルと同じ命名形式とディレクトリ構造が適用されます。

GzipCodec が選択された場合は ".gz" がファイル名に追加され、BZip2Codec が選択された場合は ".bz2" がファイル名に追加されます。

[注記] 注記

シーケンスファイル (seqfile) と Avro データファイル (avrodata) では内部圧縮がサポートされているので、内部圧縮を代わりに使用し、 flume.collector.dfs.compress.codec は指定しないようにする必要があります。

  <property>
    <name>flume.collector.dfs.compress.codec</name>
    <value>None</value>
    <description>Writes formatted data compressed in specified codec to
    dfs. Value is None, GzipCodec, DefaultCodec (deflate), BZip2Codec,
    or any other Codec Hadoop is aware of </description>
  </property>

8. Flume の高度な利用

ここでは、FlumeShell を使って Flume ノードのコントロールを自動化する方法、Flume のデータフロー仕様言語、信頼性メカニズムの内部構造、メタデータの操作方法、およびソースプラグインとシンクプラグインのインストール方法について詳しく説明します。

8.1. Flume コマンドシェル

これまでは、Flume の状態を変更するのに、マスターサーバーのシンプルな (ただし原始的な) Web インタフェースを利用してきました。

Flume では、端末からコマンドを入力し、入力したコマンドを Flume デプロイに対して実行できるシェルも用意されています。

Web インタフェースで使用可能なすべてのコマンドは、Flume シェルでも使用できます。Flume シェルでは、コマンドの実行方法のコントロールや、スクリプトを記述しやすくするための状態チェックといった制御機構もいくつか用意されています。

8.1.1. Flume コマンドシェルの使い方

端末ウィンドウで flume shell を実行すれば、FlumeShell を起動できます。マスターサーバーへの接続を確立するには、 connect コマンドを使用します。

hostname:~/flume$ flume shell
[flume (disconnected)] connect localhost:35873
Connecting to Flume master localhost:35873...
[flume localhost:35873]
hostname:~/flume$ flume shell -c localhost:35873
Connecting to Flume master localhost:35873...
[flume localhost:35873]

Flume シェルのコマンドラインパラメータは、次のとおりです。

usage: FlumeShell [-c <arg>] [-e <arg>] [-q] [-s <arg>]
 -?         Command line usage help
 -c <arg>   Connect to master:port
 -e <arg>   Run a single command
 -q         Run in quiet mode - only print command results
 -s <arg>   Run a FlumeShell script

FlumeShell では、スクリプトを作成して実行できます。具体的には、 -e で単一のコマンドを実行するか、または -s でコマンドスクリプトを実行します。また、次のようにしてパイプで stdin を FlumeShell に渡すこともできます。

echo "connect localhost:35873\ngetconfigs\nquit" | flume shell -q

Flume コマンド. TAB キーを押せば、利用可能なコマンドについてのヒントをいつでも表示できます。コマンドの入力途中で TAB キーを押せば、コマンドを補完できます。

help
シェルで利用可能なコマンドの一覧を表示します。
connect master:port
マシン master のポート port のマスターに接続します。
config logicalnode source sink
ソース source とシンク sink を持つ単一の論理ノード logicalnode を構成します。Flume の構成用の構文によっては、 sourcesink を指定するときに引用符を使う必要があります。
getnodestatus
マスターにとって既知のノードのステータスを表示します。ノードの状態は、HELLO、CONFIGURING、ACTIVE、IDLE、ERROR、DECOMMISSIONED、LOST のいずれかです。ノードが最初に表示された場合、ノードは HELLO 状態です。ノードの構成が現在行われている場合は、CONFIGURING 状態です。イベントがソースからシンクへと送られ始めると、ノードは ACTIVE 状態になります。ノードがソースからデータを取り込み終わると (ソースが "エンドレス" でない場合には)、ノードは IDLE 状態になります。ノードで回復不能なエラーが発生するか、ノードがフラッシュを行わずに終了すると、ノードは ERROR 状態になります。ノードがマスターから削除されると DECOMMISSIONED 状態になり、ノードが "長時間" にわたってマスターから見えなくなると LOST 状態になります。
getconfigs
マスターにとって既知のすべての論理ノードの構成指定を取得して表示します。
getmappings [physical node]
physical node にマップされているすべての論理ノードを表示します。 physical node が省略された場合は、すべてのマッピングを表示します。
exec
マスター上でコマンドを同期的に実行します。コマンドが終了するまでブロックします。
source file
指定されたファイルを読み取り、ファイルで指定されているすべてのコマンドの実行を試みます。
submit
マスター上でコマンドを非同期的に実行します。コマンドを実行するとすぐに戻り、ほかのコマンドをサブミットすることができます。サブミットされた最後のコマンドのコマンド ID が記録されます。
wait ms [cmdid]
このコマンドは、cmdid が SUCCEEDED 状態または FAILED 状態になるまで最大 ms ミリ秒ブロックします。ms に 0 が指定された場合は、無限にブロックすることがあります。コマンドがタイムアウトした場合、シェルは切断されます。このコマンドは、 submit したコマンドと組み合わせて使うと便利です。
waitForNodesActive ms node1 [node2 […]]
このコマンドは、指定されたリストのノードが ACTIVE 状態または CONFIGURING 状態になるまで最大 ms ミリ秒ブロックします。ms に 0 が指定された場合は、無限にブロックすることがあります。
waitForNodesDone ms node1 [node2 […]]
このコマンドは、指定されたリストのノードが IDLE 状態、ERROR 状態、または LOST 状態になるまで最大 ms ミリ秒ブロックします。
quit
シェルを終了します。

exec コマンドと submit コマンド. Web のフォームも FlumeShell も、Flume 内部の同じコマンド処理インフラへのインタフェースです。ここでは FlumeShell について取り上げ、Flume の管理を簡単にするための FlumeShell の使い方について説明します。

これらのコマンドは、マスターで実行されたものとして、入力され、実行されます。コマンドシェルでの書式は次のとおりです。

exec command [arg1 [arg2 [ … ] ] ]

submit command [arg1 [arg2 [ … ] ] ]

空白や英数文字以外を含む複雑な引数は、二重引用符 (") および単一引用符 (') で囲んで指定することができます。二重引用符で囲まれている場合、文字列の本体は、エスケープ解除された Java 文字列です。単一引用符で囲まれている場合は、単一引用符 (') そのものを除いて、任意の文字を含めることができます。

exec で実行するコマンドは、コマンドが完了するまでブロックします。submit で実行するコマンドは非同期的にマスターに送られ、実行されます。wait は基本的に、最近 submit されたコマンドを結合する操作です。

noop
このコマンドはマスターにコンタクトして noop (操作なし) コマンドを発行します。
config logicalnode source sink
このコマンドはノードを構成します。config コマンドとほぼ同じです。
multiconfig flumespec
このコマンドは、一括構成の書式を使ってマスター上で複数のノードを構成します。
unconfig logicalnode
このコマンドは、特定のノードの構成を、 null ソースと null 状態を持つ構成へと変更します。
refresh logicalnode
このコマンドは、論理ノードの現在の構成をリフレッシュします。このコマンドは論理ノードを停止させた後、再起動します。また、このコマンドを実行すると、マスターも再評価を行うため、フェイルオーバーリストが変更されることがあります。
refreshAll logicalnode
すべての論理ノードに対してリフレッシュコマンドをアトミックに発行します。
save filename
現在の構成をマスターのディスクに保存します。
load filename
filename で指定されたファイルに含まれている論理ノードの指定を、現在の構成に追加します。
map physicalnode logicalnode
論理ノード logicalnode と物理ノード physicalnode との間に新しいマッピングを作成します。ノードは null ソースと null シンクを持つ状態で起動し、ハートビートの開始時に、マスターで指定されている内容で構成を更新します。したがって、論理ノード構成がすでに存在していてこれがマップされると、ノードはその論理ノードの構成を取り込みます。
spawn physicalnode logicalnode
spawn コマンドは map コマンドと同義で、現在は推奨されないコマンドになっています。
decommission logicalnode
このコマンドは、指定された論理ノードを論理ノードの構成テーブルから削除し、この論理ノードがインストールされている可能性のあるあらゆる物理ノードから、この論理ノードのマップを解除します。
unmap physicalnode logicalnode
このコマンドは、 logicalnode の割り当てをマシン physicalnode から解除します。論理ノードは、 map コマンドを使って別の物理ノードに再割り当てすることができます。
unmapAll
このコマンドは、すべての論理ノードの割り当てを物理ノードから解除します。論理ノードは、 map コマンドを使って別の物理ノードに再割り当てすることができます。
purge logicalnode
このコマンドは、論理ノードのステータステーブルからエントリを削除します。DECOMMISSIONED ノードまたは LOST ノードを削除するときに使用します。
purgeAll
このコマンドは、論理ノードのステータステーブルから すべての エントリを削除します。DECOMMISSIONED ノードまたは LOST ノードを削除するときに使用します。ACTIVE/IDLE/ERROR 状態のノードも削除されますが、これらのノードは次回のハートビート時に再度追加されることに注意してください。

8.2. Flume のデータフロー仕様言語

Flume ノードの役割 (コレクタ、エージェント) を使うのが、Flume をセットアップして実行する最も簡単な方法です。これらの役割を実現しているソースとシンク (collectorSink、collectorSource、および agentSink) は、実際にはプリミティブなシンクから構成されており、その実体は、役割に応じたデフォルトを持つ 変換されたコンポーネント特殊なシンク および シンクデコレータ によって拡張されたプリミティブなシンクです。Flume では、これらのコンポーネントによって構成を柔軟に行うことができますが、同時に構成を複雑にしています。特殊なシンクとデコレータを組み合わせて使うには、これらのコンポーネントを支えるしくみについての細かな知識が必要ですが、両者を組み合わせることによって多機能なふるまいを的確に表現することができます。

Flume では、ドメイン固有のデータフロー言語を使って、ユーザーが独自のシンク、ソース、およびデコレータを入力することができます。以下では、このデータフロー言語について詳しく説明します。

nodeName      ::=  NodeId
simpleSource  ::=  SourceId args?
simpleSink    ::=  SinkId args?
decoratorSink ::=  DecoId args?

source ::= simpleSource

sink ::=   simpleSink            // single sink
     |     [ sink (, sink)* ]    // fanout sink
     |     { decoratorSink => sink } // decorator sink
     |     decoratorSink1 decoratorSink2 ... decoratorSinkN sink // decorator sink
     |     < sink ? sink >           // failover / choice sink
     |     roll(...) { sink }        // roll sink
     |     collector(...) { sink }   // generic collector sink

logicalNode ::= NodeId : source | sink ;

spec   ::=  (logicalNode)*

8.2.1. 特殊なシンク: ファンアウト、フェイルオーバー、およびロール

シンクには、ファンアウトシンク、フェイルオーバーシンク、およびロールシンク の 3 つの特殊なシンクがあります。

ファンアウトシンクは、あらゆる受信イベントを、自分の子として指定されたすべてのシンクに送信します。ファンアウトシンクは、データを複製したり、メインの信頼性の高いデータフローパスから切り離してデータを処理したりする場合に使用できます。ファンアウトシンクは Unix の tee コマンドと似ており、論理的には、イベントを各サブシンクに送信する AND 演算子のように動作します。

ファンアウトシンクの構文は次のとおりです。

[ console, collectorSink ]

フェイルオーバーシンクは、新しいイベントを追記するときに障害が発生した場合、これに対応するために使用します。フェイルオーバーシンクを使うと、プライマリコレクタに障害が発生したときの代替コレクタを指定したり、プライマリコレクタが復旧するまでデータを保存しておくローカルディスクシンクを指定したりすることができます。フェイルオーバーシンクは例外処理にたとえることができ、論理的には OR 演算子のように動作します。フェイルオーバーが成功した場合、サブシンクのいずれかがイベントを受け取っています。

フェイルオーバーシンクの構文は次のとおりです。

< logicalSink("collector1") ? logicalSink("collector2") >

この場合、たとえばノード "agent1" では、 collector1 で障害が発生した場合 (collector1 への接続がダウンした場合や、 collector1 の HDFS がいっぱいになった場合など) のフェイルオーバー先として collector2 を使用するよう構成できます。

agent1 : source | < logicalSink("collector1") ? logicalSink("collector2") > ;

ロールシンクは、 millis ミリ秒ごとに、サブシンクの新しいインスタンスをオープンして現在のインスタンスをクローズします。ロールは、サブシンクの現在のインスタンスをクローズし、ついで、データを追記するための新しいインスタンスをオープンするアトミックな操作です。異なるロール期間のデータを区別するエスケープシーケンスとして、特別な %{rolltag} 属性が用意されています。この属性を使うと、ロールが行われるたびに一意の名前を持った新しいファイルを作成できます。

ロールシンクの構文は次のとおりです。

roll(millis) sink

これらのシンクを組み合わせて、より多くの機能を持たせることもできます。次に示すのは、console に出力し、フェイルオーバーコレクタノードを持つシンクの例です。

[ console, < logicalSink("collector1") ? logicalSink("collector2") > ]

次に示すのは、コレクタをロールして、1000 ミリ秒ごとに異なる HDFS ファイルに書き込みを行う例です。

roll(1000) [ console, escapedCustomDfs("hdfs://namenode/flume/file-%{rolltag}") ]

8.2.2. シンクデコレータについて

ファンアウトとフェイルオーバーは、システムのどこにメッセージが行くかに影響を与えますが、メッセージ自体には変更を加えません。データフローを流れるイベントにデータを追加したり、イベントをフィルタリングしたりするには、ここで説明する シンクデコレータ を使用できます。

シンクデコレータは、シンクにプロパティを追加することができ、シンクデコレータを経由するデータストリームを変更することができます。たとえば、シンクデコレータを使って、先行書き込みログによって信頼性を向上させたり、バッチ処理と圧縮によってネットワークスループットを改善したり、サンプリングやベンチマークの実行、さらにはライトウェイトな分析を行うこともできます。

次に示すのは、10 個につき 1 個の要素をソース "source" からシンク "sink" に 送信するよう構成された intervalSampler を使った簡単なサンプリングの例です。

flumenode: source | intervalSampler(10) sink;
[注記] 注記

以前は、デコレータとシンクを { deco ⇒ sink } のように囲んでデコレータを指定するより冗長な構文がありました。この古い構文も引き続き有効です。

次に示すのは、100 個のイベントをまとめてバッチ化する例です。

flumenode: source | batch(100) sink;

ファンアウトシンクやフェイルオーバーシンクと同様、デコレータはほかのコンポーネントと組み合わせることができます。次に示すのは、100 個のイベントをまとめてバッチ化し、これを圧縮してからシンクに移動する例です。

flumenode: source | batch(100) gzip sink;

v0.9.3 以降の Flume では、"collectorMagic" 機能のカプセル化をさらに進めた汎用性の高い collector シンクが新しく用意されました。collector シンクでは、Flume の仕様言語を使って任意のシンク (複数のシンク) を指定することができます。たとえば、次のシンク指定は、

collectorSink("xxx","yyy-%{rolltag}", 15000)

次のように記述できます。

collector(15000) { escapedCustomDfs("xxx","yyy-%{rolltag}") }

次に示すのは、新たに表現できるようになった機能の例です。

collector(15000) { [ escapedCustomDfs("xxx","yyy-%{rolltag}"), hbase("aaa", "bbb-%{rolltag}"), elasticSearch("eeee","ffff") ] }

この場合、データは 3 つの宛先すべてに送られ、3 つすべてで処理が成功した場合にのみ、受信確認が送信されます。一部のリカバリデコレータ (stubborn*、insistent*) と組み合わせて使えば、複雑になりがちな障害回復ポリシーを的確に表現することができます。

8.2.3. 高水準ソースおよび高水準シンクの変換

シンクは、Flume の内部で、より単純なデコレータ、フェイルオーバー、ローラの組み合わせに変換され、プロパティが追加されます。これらを適切に組み合わせることによってパイプラインが作成され、このパイプラインによって Flume のさまざまな信頼性のレベルが実現されています。

[注記] 注記

以下では、エージェントの動作について説明していますが、現在のバージョンの Flume では、auto*Chain が公開されているのと同じように、変換についてもその詳細が公開されているわけではありません。Flume の将来のバージョンでは、変換の詳細についても公開される予定です。厳密な変換の内容は、現在まだ開発中です。

たとえば、異なるエージェントシンクを使用する次のようなノードがあったとします。

node1 : tail("foo") | agentE2ESink("bar");
node2 : tail("foo") | agentDFOSink("bar");
node3 : tail("foo") | agentBESink("bar");

変換フェーズでは、上の agentE2ESink などは実際には次のような Flume シンクに変換されます。

node1 : tail("foo") | ackedWriteAhead lazyOpen stubbornAppend logicalSink("bar") ;
node2 : tail("foo") | < lazyOpen stubbornAppend logicalSink("bar") ?  diskFailover insistentOpen lazyOpen stubbornAppend logicalSink("bar") >;
node3 : tail("foo") | lazyOpen  stubbornAppend  logicalSink("bar");

ackedWriteAhead は、実際には複合的なデコレータで、内部ではロールとほかのいくつかの特殊なデコレータを使用しています。このデコレータインタフェースを利用すると、バッチ化と圧縮のオプションを手動で指定することができます。たとえば、100 メッセージごとに gzip で圧縮する場合には、次のように指定します。

node1 : tail("foo") | ackedWriteAhead batch(100) gzip lazyOpen stubbornAppend logicalSink("bar");

collectorSink("xxx","yyy",15000) も、受信確認を処理するための独自のデコレータを使った複合的なシンクです。内部ではローラを使用し、ローラの中で escapedCustomDfsSink を使用しています。

roll(15000) { collectorMagic escapedCustomDfs("xxx", "yyy-%{rolltag}") }

変換が行われるもうひとつの場所は、論理ノードです。ごく少数のノードから始めましょう。

node1 : tail("foo") | { .... => logicalSink("node2") };
node2 : logicalSource | collectorSink("...");

変換メカニズムによって、logicalSource と logicalSink は低水準の物理的な rpcSource と rpcSink に変換されます。node1 がマシン host1 上にあり、node2 がマシン host2 上にあるとしましょう。変換後、ノードの構成は次のようになります。

node1 : tail("foo") | { .... => rpcSink("host2",12345) };
node2 : rpcSource(12345) | collectorSink("..."");

マッピングをスワップして、node2 が host1 上に、node1 が host2 上にあるようにするとします。

# flume shell commands
exec unmapAll
exec map host1 node2
exec map host2 node1

元の構成は、今度は次のように変換されます。

node1 : tail("foo") | { .... => rpcSink("host1",12345) };
node2 : rpcSource(12345) | collectorSink("..."");

8.3. 独自のメタデータ抽出

Flume は raw データを取り込むことができますが、データの一部分だけが必要な場合、ユーザーはデータが経由するノードに基づいてデータに構造を追加し、フィルタリングによってデータの一部を除去して raw データの量を最小限に抑えることができます。

最も単純なデコレータは、 value デコレータです。value デコレータは、属性名、およびイベントのメタデータに追加する値の 2 つの引数を取ります。value デコレータを使うと、ソースに関する一定の情報を付加したり、特定のノードに任意のデータを付加したりすることができます。また、Flume を通るデータがどこから来たかを単純に追跡する目的でも使用できます。

8.3.1. 抽出操作

既知の構造を持つログから値を抽出するために抽出操作を利用することもできます。

1 つの例は、 regex デコレータです。regex デコレータは、正規表現、インデックス、および属性名の 3 つの引数を取ります。ユーザーは属性名を使って、イベントの本体から特定の正規表現に一致するグループを抽出し、指定された属性の値として書き出すことができます。

split デコレータも同様に、正規表現、インデックス、および属性名の 3 つの引数を取ります。split デコレータは正規表現に基づいて本体を分割し、セパレータのインスタンスの後に出現するテキストグループを抽出して、指定された属性の値として書き出します。split デコレータは、Unix ユーティリティの awk を大幅に簡略化したバージョンにたとえることができます。

8.3.2. メタデータのフィルタリングと加工

Flume は不変式を適用して、すでに上流で書き込まれた属性を変更できないようにしています。このようにすることで、データフローのデバッグが簡単になり、デバッグ時の可視性も向上します。

ただし、データの経由するステージが多いと、非常に多くのメタデータがさまざまな場所に運ばれることになります。Flume では、こうした状況にも対応できるよう、いくつか追加の操作を実行できるようになっています。

select 操作を実行できます。select 操作は SQL の select に似ており、関係論理の setProjection 操作を行ってイベントを変更し、特定のメタデータフィールドが転送されるようにします。

指定された属性以外のすべてのメタデータ属性を転送する mask 操作も実行できます。

エスケープメカニズムを使用して、イベントの本体をユーザーがカスタマイズ可能なメッセージに書き換える format デコレータも利用できます。このデコレータは、少量のソースにサマリーデータを出力するときに便利です。たとえば、一定時間ごとに IRC チャンネルにサマリー情報を書き出すといった使い方が考えられます。

8.3.3. 役割に応じたデフォルト

論理ノードに特定の役割を割り当てると、作業の負担を減らすことができます。役割を割り当てる操作は、デフォルトの設定が適用される「自動」指定と考えることができます。現在用意されているのは、エージェントの役割とコレクタの役割の 2 つです。同じ役割を持つ多数のノードが存在することになるので、これらをまとめて層 (tier) と呼びます。たとえば、エージェント層は、エージェントの役割を持つすべてのノードから構成されます。コレクタの役割を持つノードは、コレクタ層に属します。

エージェントの役割とコレクタの役割は、 conf/flume-site.xml ファイルで指定されたデフォルトを持ちます。構成オプションの説明については、 conf/flume-conf.xml ファイルの中の flume.agent.*flume.collector.* というプリフィックスの付いたプロパティを参照してください。

各ノードは自身の状態を保持し、自身の構成を持っています。該当する論理ノードを対象とするデータフロー構成がマスターに存在しない場合、その論理ノードは IDLE 状態のままです。構成が存在する場合、論理ノードはデータフローをインスタンス化して、これをほかのデータフローと並行して動作させるよう試みます。

これは、各マシンが基本的には 1 つの層にのみ存在することを意味します。複雑な構成では、1 台のマシンに複数の論理ノードが存在することがありますが、このような場合、各ノードはさまざまな役割を取りうるので、マシンは複数の層にまたがって存在することになります。

8.3.4. 任意のデータフローと独自のアーキテクチャ

thriftSink と thriftSource を使うと、複数のノードを経由してデータを送信することができます。この場合、受信確認注入デコレータと受信確認チェックデコレータを適切に挿入すれば、信頼性を確保することができます。

8.4. シンク/ソース/デコレータプラグインによる拡張

新しい独自のソース、シンク、およびデコレータをシステムに追加できる実験的なプラグインメカニズムが用意されています。

この機能を使用するには、次の 2 つの手順を実行する必要があります。

  1. まず、新しいプラグインクラスが収められた jar を flume のクラスパスに追加します。プラグインで DLL/so が必要になる場合は、これらのライブラリが LD_LIBRARY_PATH (unix .so の場合) または PATH (windows .dll の場合) に含まれるようにします。
  2. 次に、flume-site.xml ファイルで、新しいソース、シンク、およびデコレータのクラス名を flume.plugin.classes プロパティに追加します。複数のクラスはカンマで区切って指定します。新しいコンポーネントをシステムとデータフロー言語のライブラリに追加する特別な静的メソッドを見つけるために、Java リフレクションが使われます。

サンプルコンポーネントとして、"HelloWorld" ソース、シンク、およびデコレータがすでに「プラグイン」化されています。このプラグインは非常に簡単な処理を行います。具体的には、ソースが 3 秒おきに "hello world!" というテキストを生成し、シンクはイベントを "helloworld.txt" テキストファイルに書き込み、デコレータは渡されたすべてのイベントの前に "hello world!" を追加します。

  1. plugins/helloworld ディレクトリに移動し、ant と入力すると、 helloworld_plugin.jar ファイルが生成されます。
  2. flume-site.xml ファイル (このファイルが存在しない場合は作成します) の flume.plugin.classes プロパティに、"helloworld.HelloWorldSink,helloworld.HelloWorldSource,helloworld.HelloWorldDecorator" を追加します。

    [重要項目] 重要項目

    パッケージに付属の flume-site.xml.template ファイルを使って flume-site.xml を作成する場合は、サンプルテンプレートに含まれているプロパティの例をすべてコメントアウトするか、または削除してください。

    サンプル flume-site.xml の内容. 

    <configuration>
      <property>
        <name>flume.plugin.classes</name>
        <value>helloworld.HelloWorldSink,helloworld.HelloWorldSource,helloworld.HelloWorldDecorator</value>
        <description>Comma separated list of plugins</description>
      </property>
    </configuration>

  3. Flume マスターと少なくとも 1 つの論理ノードを別々の端末で起動します。

    1. 各端末で flume のトップディレクトリに移動します。すぐ下に plugins ディレクトリがあるはずです。
    2. 両方の 端末で FLUME_CLASSPATH に helloworld_plugin.jar を追加します。

      export FLUME_CLASSPATH=`pwd`/plugins/helloworld/helloworld_plugin.jar
    3. 端末 1 で bin/flume master を実行します。
    4. 端末 2 で bin/flume node -n hello1 を実行します。
  4. これで、マスターと hello1 ノードが起動し、プラグインが読み込まれるはずです。

    マスターと hello1 の両方に、次のようなログ出力が表示されるはずです。. 

    10/07/29 17:35:28 INFO conf.SourceFactoryImpl: Found source builder helloWorldSource in helloworld.HelloWorldSource
    10/07/29 17:35:28 INFO conf.SinkFactoryImpl: Found sink builder helloWorldSink in helloworld.HelloWorldSink
    10/07/29 17:35:28 INFO conf.SinkFactoryImpl: Found sink decorator helloWorldDecorator in helloworld.HelloWorldDecorator

    [ティップ] ティップ

    プラグインが読み込まれているかどうかを確認するもうひとつの方法として、 http://localhost:35871/masterext.jsp のページにプラグインが表示されるかどうかをチェックするやり方もあります。

  5. hello1 を構成します。

    [ティップ] ティップ

    最も簡単な方法は、マスターの構成ページをブラウザで開くことです。通常、このページは http://localhost:35871/flumeconfig.jsp にあります。

    1. helloworld ソース/シンクを hello1 ノードにロードします (マスターの Web インタフェースを使う場合は、下のテキストボックスに次の内容を入力し、[送信] をクリックします)。

      hello1: helloWorldSource() | helloWorldSink();
    2. helloworld デコレータも次の要領で試すことができます。

      hello1: helloWorldSource() | { helloWorldDecorator() => helloWorldSink() };

      どちらの場合も、 hello1 は現在の作業ディレクトリに helloworld.txt ファイルを出力します。このファイルには、3 秒おきに "hello world!" の行が出力されます。

8.4.1. Flume 拡張機能のセマンティクス

Flume は、スレッドを使って再構成と複数の論理ノードをサポートしています。ソース、シンク、およびデコレータは、キューを使用してバッファリングを行ったり、クロックスリープを使用して一定時間ごとのロールや再試行を行うことができる拡張機能です。開発者はこれらのブロックする操作を使用します。また、開発者が利用できる内部の並行制御メカニズムも、デッドロックやハングを引き起こす可能性があります。このため、適切な動作をするソース、シンク、またはデコレータを作成するには、守らなければならない規則、テストケースにおいて適用しなければならない規則があります。

[注記] 注記

ここで説明している内容はまだドラフト段階で、網羅的でも完全に正式なものでもありません。今後さらに制限事項が増える可能性があります。[訳注:以下の説明は本当に「ドラフト」段階のようです。]

ソースのセマンティクス. ソースは次の 4 つのメソッドを実装しなければなりません。

  • void open() throws IOException
  • Event next() throws IOException
  • void close() throws IOException
  • ReportEvent getReport()

これらのシグニチャに加え、各メソッドはさまざな RuntimeException をスローすることができます。これらの例外は障害が発生したことを示し、デフォルトでは論理ノードはシャットダウンして ERROR 状態に移行します。これらのエラーメッセージはユーザーに表示されますが、スタックをダンプしなくてもエラーについて対処できる有益な情報をユーザーに示すことが重要です。つまり、NullPointerException は許容されません。NullPointerException は、スタックトレースなしには説明にならず、役にも立ちません。

有効な実行時例外の例を挙げれば、無効な引数が指定されていてソースを開くことができない場合や、無効状態例外 (クローズされたソースに対して next を呼び出す試み) などがあります。

8.4.1.1. シンプルソースのセマンティクス
sourceStatesSimple.png

シンプルソースは、open と close 操作がすばやく行われ、長時間にわたってブロックすることがないことを前提とするソースです。許容される最大の一時停止時間は、10 秒程度 (DNS 参照失敗のデフォルト時間) です。

ソースのコンストラクタは、ブロックしたり、ネットワークコネクタやファイルハンドルなどの IO を必要としたりするリソースを使わずに呼び出せるようになっている必要があります。構成設定に起因するエラーがあって、これをコンストラクタで捕捉できる場合は、IllegalArgumentException をスローする必要があります。

open() は、ソース用のリソースを確保して next() の呼び出しを行えるようにする呼び出しです。シンプルソースの open 呼び出しは、フェイルファストにする必要があります。open 呼び出しは IOException または IllegalStateException などの RuntimeException をスローできます。open は、CLOSED 状態のシンクに対してのみ呼び出すことができるようにする必要があります。シンクが 2 度オープンされた場合、2 回目の呼び出しでは IOException または IllegalStateException をスローする必要があります。

next() は、オープンされたソースからイベントを取得する呼び出しです。ソースがオープンされていない場合は、IllegalStateException をスローする必要があります。next() 呼び出しは、ブロックすることができ、実際にしばしばブロックします。next() がブロックしている場合、 close() の呼び出しは、next() が null を返して正常終了するようにし、 next() のブロックを解除する必要があります。TCP ソケットなどの多くのリソース (および理論上 RPC フレームワーク) は、デフォルトでは、ブロックされたネットワーク読み取り (next() の呼び出しなど) に対し、 close() された時点で例外をスローします。

close() は、ソースの open 呼び出しが確保したリソースを解放する呼び出しです。close() 呼び出しはそれ自体、すべてのリソースが解放されるまでブロックします。このような動作によって、同一スレッドにおける次の open() がリソース競合によって失敗することのないようになっています (たとえば、ポート 12345 のサーバーソケットをクローズする処理は、ポート 12345 が再びバインドできる状態になるまでは戻りません)。

getReport() は ReportEvent を返します。これらの値は、ノードがオープンされているかクローズされているかにかかわらず利用可能である必要があり、 getReport() 呼び出しは、ほかの呼び出しによって (潜在的なロックインバージョンの問題によって) ブロックしないようにする必要があります。取得される値はアトミックに取得されるのが理想ですが、競合実行によるエラーが生じない限り、このような動作は必須ではありません。

ソースが複数回オープンされたり、クローズされたりした場合、値がリセットされるのか、オープン/クローズのサイクルを通じて永続するのかは、ソース側で決定することになっています。

8.4.1.2. バッファ付きソースのセマンティクス
sourceStatesBuffered.png

一部のソースは、インメモリのキュー、またはディスクに永続的に格納されるバッファを持っています。原則としてバッファ付きソースは、 close() 時、新しいデータが入ってくるのを防いで、バッファリングされたデータのフラッシュを試みる必要があります。また、従属スレッドがあれば、close が戻る前に解放しなければなりません。close() 時に一定時間 (現在のデフォルトは 30 秒) 進行がない場合、制御スレッドは Thread.interrupt() を呼び出します。ソースは、InterruptedException を処理することができる必要があり、割り込まれたステータスをコールスタックにパーコレート (委任) する必要があります。

[注記] 注記

v0.9.1 と v0.9.2 では、割り込みは、それが捕捉されたときに処理する必要があります。具体的には、Thread の割り込みフラグを再度立てて (Thread.currentThread().interrupt() を呼び出して)、IOException をスローします。拡張機能の API は今後、IOException か InterruptedException のいずれかをスローするように変わる可能性があります。

open() 変更ありません。

close() の呼び出しは通常、open() の呼び出しまたは next() の呼び出しとは別のスレッドから行われます。close() の呼び出しはリソースが解放されるまでブロックするため、呼び出しから戻る前にバッファのフラッシュを試みる必要があります。たとえば、ネットワークソースにバッファリングされたデータがある場合、新しいデータが入ってくるのを防ぐためにまずネットワーク接続をクローズし、次に、バッファリングされたデータをフラッシュする必要があります。ソースは、InterruptedException を処理できる必要があり、割り込まれたステータスをコールスタックにパーコレート (委任) し、エラーの状態を示す必要があります。

CLOSING 状態にある場合の next() 呼び出しは、バッファが空になるまで値をプルする操作を続行する必要があります。next() 呼び出しがメインドライバスレッドまたは従属ドライバスレッドで行われている場合は、特にこの操作を行うことが重要です。この目的を達成するためのしくみとして、正常終了を示す特別な DONE イベントをキュー/バッファに追加するやり方があります。

getReport() には、ソース内のキューのサイズ、キュー内の要素数などのメトリック情報を含めるのが理想です。

シンクとデコレータのセマンティクス. シンクとデコレータは次の 4 つのメソッドを実装しなければなりません。

  • void open() throws IOException
  • void append(Event e) throws IOException
  • void close() throws IOException
  • ReportEvent getReport()

これらのシグニチャに加え、各メソッドはさまざな RuntimeException をスローすることができます。実行時例外は障害が発生したことを示し、デフォルトでは論理ノードはシャットダウンして ERROR 状態に移行します。これらのエラーメッセージはユーザーに表示されますが、スタックをダンプしなくてもエラーについて対処できる十分な情報をユーザーに示すことが重要です。つまり、NullPointerException は許容されません。NullPointerException は、スタックトレースなしには説明にならず、役にも立ちません。

8.4.1.3. シンプルシンク
sinkStatesSimple.png

シンプルシンクは、open、close、および append 操作がすばやく行われ、長時間にわたってブロックしないことを前提とするソースです。許容される最大の一時停止時間は、10 秒程度 (DNS 参照失敗のデフォルト時間) です。

シンクとデコレータのコンストラクタは、ブロックしたり、ネットワークコネクタやファイルハンドルなどの IO を必要としたりするリソースを使わずに呼び出せるようになっている必要があります。構成設定に起因するエラーがあって、これをコンストラクタで捕捉できる場合は、IllegalArgumentException をスローする必要があります。

open() は、シンクまたはデコレータ用のリソースを確保して append(Event) の呼び出しを行えるようにする呼び出しです。構成設定に起因するエラーがあって、これをコンストラクタ内で IO なしに検出できない場合、open() をフェイルファストにして、IOException、または IllegalStateException や IllegalArgumentException などの RuntimeException をスローする必要があります。open は、CLOSED 状態のシンクに対してのみ呼び出すことができるようにする必要があります。シンクが 2 度オープンされた場合、2 回目の呼び出しでは IOException または IllegalStateException をスローする必要があります。

append() は、イベントを配信する呼び出しです。シンクがオープンされていない場合、この呼び出しは IllegalStateException をスローする必要があります。

内部エラーが原因で通常のデコレータがオープンまたはアペンドに失敗するか、あるいはサブシンクがオープンに失敗した場合、デコレータはリソースを解放し、サブシンクのクローズを試み、例外をスローする必要があります。一部のシンク/デコレータでは、これらのセマンティクスに特別に手を加えていますが、これは慎重に行う必要があります。

close() は、ソースの open 呼び出しが確保したリソースを解放する呼び出しです。open() または next() がブロックしている場合、close() の呼び出しはこれらの呼び出しのブロックを解除して終了させる必要があります。close() は、オープンされたシンクに対して呼び出す必要がありますが、クローズされたシンクについては、例外をスローせずにそのシンクに対して close() が呼び出されることを許容しています (ただし、一般に LOG 警告は生成されます)。

getReport() は ReportEvent を返します。これらの値は、ノードがオープンされているかクローズされているかにかかわらず利用可能である必要があり、 getReport() 呼び出しは、ほかの呼び出しによって (潜在的なロックインバージョンの問題によって) ブロックしないようにする必要があります。取得される値はアトミックに取得されるのが理想ですが、競合実行によるエラーが生じない限り、このような動作は必須ではありません。シンクが複数回オープンされたり、クローズされたりした場合、値がリセットされるのか、オープン/クローズのサイクルを通じて永続するのかは、シンク側で決定することになっています。

8.4.1.4. バッファ付きシンクとデコレータのセマンティクス
sinkStatesBuffered.png

一部のシンクは、メモリ内、またはディスクに永続的に格納されるキューやバッファを持っています。 原則としてバッファ付きシンクは、 close() を要求されたときに、バッファリングされたデータのフラッシュを試みる必要があります。この点については、シンクとデコレータシンクが比較的すばやいクローズを試みる必要があるという要件との間で、バランスを取る必要があります。

open() このシンクはオープンされていないので、一般にバッファリングされたデータは存在しません。ただし、永続データを持ったシンクまたはデコレータに対する open() では、open() の呼び出しの中でデータの回復を試み、これをキューに入れる必要があります。こうしたデータの例として、ネットワークサブシンクがダウンしたときに dfo/wla ログを回復する DFO または WAL ログがあります。

append() 呼び出しは、送信前にデータをバッファリングすることができます (バッチ化デコレータなど)。close() 呼び出しは、クローズを実行する前に、バッファリングされたデータをその (サブ) シンクに追記する試みを行う必要があります。また、従属スレッドがあれば、シャットダウン前にこれを停止する必要があります。

close() 時に一定時間 (現在のデフォルトは 30 秒) 進行がない場合、 close() は割り込まれ、この割り込みによる突然の終了を処理する必要があります。

8.4.1.5. 再試行、スリープ、異常終了
sinkStatesOpening.png

open セマンティクスでの再試行

sinkStatesAppending.png

append セマンティクスでの再試行

一部のデコレータには、再試行とスリープが採り入れられています。再試行とスリープを必要とする開発者は、これらの操作が、open、append、および close に対して適切に動作するようにする必要があります。バッファ付きシンクと組み合わせる場合、close 時のバッファのフラッシュが不可能になることがあります (dead 状態のネットワーク接続に対してデータの送信を試行する WAL など)。このため、これらのシンク/デコレータでは、突然終了してエラーを報告するための手段が必要です。こうしたケースの処理が必要になる操作は 2 つあります。ひとつは無限の再試行、もうひとつは無限/長時間のスリープ/待機/ブロックです。一部のデコレータは、潜在的に無限に再試行するセマンティクスを持っています。たとえば、InsistentOpen、InsistentAppend、および FailoverSink は、 open()append() の呼び出しを無限に試みる可能性があります。これらのデコレータはほかのデコレータでラップすることができますが、これは、ハード終了をパーコレート (委任) して再試行ロジックをバイパスできるようにしておく必要があることを意味します。

そのためには、再試行ロジックを持つあらゆるシンク/デコレータに対し、これらのシンク/デコレータが再試行の前にハード終了をチェックしなければならないようにする必要があります。これらのシンクは (再試行ロジックを持つ場合)、ハード終了割り込みをその上流のデコレータに伝播しなければなりません。

一部のシンクは、長時間、場合によっては無限にバックオフまたはスリープする可能性があります。ラッチを使った待機 (CountDownLatch.await()) やスレッドスリープ (Thread.sleep()) など、スリープ操作または同期化操作を使用するコードでは、割り込みを適切に処理しなければなりません。これらの委譲操作は、一般に再試行操作でのみ使われるので (すなわちエラーが発生した場合)、シンク/デコレータは、割り込みを伝播し、エラーで失敗する動作を行う必要があります。

これらのセマンティクスから、いくつかのことが帰結されます。open、close、および append 操作のロックは慎重に行われなければなりません。スリープまたはブロックを伴う open() 操作 (InsistentOpen、FailoverSink など) がある場合、close の呼び出しで open 操作がシャットダウンされ、open 呼び出しのブロックが解除される動作が理想です。 append()

close のシグナルを受け取りながら、 append() または open() でブロックされているシンクは、妥当な時間の経過後に終了する必要があります。理想は、数回のハートビート以内です (ハートビートのデフォルトは 5 秒なので、30 秒未満が理想です)。終了してバッファが空になったシンクは、通常の成功として戻る必要がります。フラッシュされなかったイベントがあった場合は、例外をスローしてエラーを返す必要があります。従属スレッドがある場合、これらの従属スレッドは close が戻る前に終了される必要があります。

8.5. ソース/シンク・ペア間でのデータ転送レートの制限

Flume は、ソース/シンク・ペア間のデータ転送レートを制限することができます。この機能は、ソース/シンク・ペアに割り当てられるネットワークの帯域幅を、転送されるログの種類に応じて変更したい場合に便利です。たとえば、特定の種類のログをほかのログより高いレートで転送したり、一日のうちの時間帯に応じてログの転送レートを変更したりすることができます。

データ転送レートを制限することが有益なもうひとつの例として、障害発生後のネットワーク (またはコレクタ) の回復を挙げることができます。この場合、エージェントは大量のデータをバックアップしていて、これを送信しようとします。もし転送レートに制限がなければ、エージェントはコレクタ側のすべてのリソースを使い果たしてしまい、場合によってはコレクタをクラッシュさせる可能性があります。

Flume では、choke デコレータと呼ばれる特殊なシンクデコレータを使って、ソース/シンク・ペア間のデータ転送レートを制限することができます。各 choke デコレータには choke-id を割り当てる必要があります。次に示すのは、ノードのソースとシンクの間で choke を使う例です。この chokechoke-id は "Cid" です。

node: source | { choke("Cid") => sink };

choke-id は物理ノードに固有です。nodechoke を使う前に、その node のある物理ノード上で choke-id を登録しなければなりません。物理ノード上で choke-id を登録するには setChokeLimit コマンドを使います。choke-id を登録するときはレートの制限 (KB/秒単位) も割り当てなければなりません。次に示すのは、物理ノード host 上で choke-id "Cid" を登録し、1000 KB/秒の制限を割り当てる例です。

exec setChokeLimit host Cid 1000
[注記] 注記

実行時に setChokeLimit を使って、 choke-id に割り当てられた制限を変更することもできます。

choke-id に対する制限は、その choke-id を使用する複数の choke が合計でデータ転送に使用できるレートを示します。上に示した例では、 choke-id "Cid" を持つ choke を使うソース/シンク・ペアは物理ノード host 上に 1 つしかありません。したがって、このソース/シンク・ペア間でのデータ転送レートは、1000 KB/秒に制限されます。

[注記] 注記

レートを制限する場合に考慮されるのは、イベント本体のサイズだけです。

choke デコレータの具体的な動作は次のとおりです。 choke が接続されているシンクに対して append() が呼び出された場合、(短時間における) 転送データの総量が、該当する choke に対応する choke-id に割り当てられた制限の範囲内に収まっていれば、 append() の呼び出しは通常どおり機能します。制限を超過しているときは、短時間のあいだ append() はブロックされます。

choke を使用するソース/シンク・ペアが複数あって、これらがいずれも同一の choke-id を使用しているとします。さらに、 node1node2 が、同じ物理ノード host 上の 2 つの論理ノードで、 choke-id "Cid" が 1000 KB/秒の制限を指定されて host 上で登録されているとします。

node1: source1 | { choke("Cid") => sink1 };
node2: source2 | { choke("Cid") => sink2 };

この例の場合、 source1-sink1 ペアと source2-sink2 ペアはどちらも同じ choke-id "Cid" を持つ choke を使っているので、これらのソース/シンク・ペア間でやり取りされるデータの総量は、1000 KB/秒に制限されます。Flume は、この制限をソース/シンク・ペア間でどのように分けるかについては関与しませんが、どちらのソース/シンク・ペアでもデータ転送のできない状態が生じないことを保証します。

[注記] 注記

同じ物理ノード上に存在する複数のソース/シンク・ペアが、同一の choke-id を持つ choke を使う場合、これらのソース/シンク・ペア間でレートの制限がどのように分けられるかについての保証はありません。

9. Flume と HDFS のセキュリティ統合

[注記] 注記

このセクションを読む必要があるのは、Kerberos 化された HDFS クラスタを使っている場合だけです。CDH3b2 またはバージョン 0.21.x 以前の Hadoop を実行している場合、このセクションを飛ばして次のセクションに進んでもかまいません。

Flume のデータパスが、"セキュアな" Hadoop および HDFS と対話できる必要があります。Hadoop および HDFS の設計者は、Kerberos V5 のシステムとプロトコルを使って各種クライアントとサービス間の通信を認証する方法を採用しました。Hadoop クライアントに相当するのは、ユーザーや、ユーザーに代わって実行される MapReduce ジョブで、サービスに相当するのは HDFS、MapReduce です。

ここでは、 Flume ノードを、Kerberos 化された HDFS サービスに対するユーザー flume としてセットアップする方法について説明します。ここでは、Flume ノードと Flume マスターの間の通信や Flume のフローの中で Flume ノード間の通信をセキュアにする方法については 取り上げません 。現在の Flume の実装では、分離された個々のフローを異なるユーザーとして記述することは、サポートされていません。

[注記] 注記

ここで説明する内容は、セキュリティ強化ベータの CDH (CDH3b3 以降) と、MIT Kerberos 5 の実装でのみテストされています。

9.1. 基礎

Flume は、特定の Kerberos プリンシパル (ユーザー) としてふるまい、クレデンシャルを必要とします。Kerberos クレデンシャルは、Kerberos 化されたサービスと対話するのに必要となります。

クレデンシャルは 2 つの方法で入手できます。ひとつは、対話ユーザーによって使われる方法です。この方法では、対話式ログインが必要となります。もうひとつは、一般に (Flume デーモンのような) サービスによって使われる方法です。この方法では、特別に保護された keytab と呼ばれるキーテーブルファイルが使われます。

kinit プログラムを対話的に使って Kerberos KDC (Key Distribution Center) にコンタクトする方法は、自分の身元を証明する 1 つの方法です。この方法では、ユーザーがパスワードを入力する必要があります。それには、一般に user@REALM.COM という形式の 2 つの部分からなるプリンシパルを KDC にセットアップする必要があります。kinit を介してログインすると、TGT (Ticket Granting Ticket) が付与され、このチケットをほかのサービスとの認証に使用することができます。

[注記] 注記

このユーザーは NameNode マシン上にもアカウントを持っている必要があります。Hadoop は、アクセスを許可するかどうか判断するときに、この NameNode マシンから得たユーザーとグループの情報を使います。

ユーザーやサービスの認証は、特別に保護された keytab ファイルを使って行うこともできます。keytab ファイルには TGT (Ticket Granting Ticket) が収められており、この TGT を使うことで、Kerberos KDC を介してクライアントとサービスを互いに認証することができます。

[注記] 注記

keytab を使う方法は、「パスワードなし」の ssh 接続に似ています。keytab を使う方法で id_rsa 非公開鍵の代わりになるのは、該当サービスの非公開鍵を含む keytab エントリです。

通常、Flume ノードデーモンは無人で (すなわちサービススクリプト経由で) 起動されるので、keytab を使う方法でログインする必要があります。keytab を使う場合、Hadoop サービスでは、 user/host.com@REALM.COM という形式の 3 つの部分からなるプリンシパルが必要です。このとき、ユーザーには flume を使い、サービスにはマシンのホスト名を使うことを推奨します。Kerberos および Kerberos 化された Hadoop が適切にセットアップされている場合、Flume ノードのプロパティファイル (flume-site.xml) に追加する必要があるパラメータはごくわずかです。

<property>
<name>flume.kerberos.user</name>
<value>flume/host1.com@REALM.COM </value>
<description></description>
</property>

<property>
<name>flume.kerberos.keytab</name>
<value>/etc/flume/conf/keytab.krb5 </value>
<description></description>
</property>

この例では、 flume がユーザーで、 host1.com はサービス、 REALM.COM は Kerberos レルムです。/etc/keytab.krb5 ファイルには、 flume/host1.com@REALM.COM がほかのサービスに対して認証するのに必要なキーが収められています。

Flume と Hadoop は、サービスが実行されているマシンのホスト名に展開されるシンプルなキーワード (_HOST) を用意しています。このキーワードを使えば、同じ flume.kerberos.user プロパティを記述した flume-site.xml ファイルをすべてのマシンで利用できます。

<property>
<name>flume.kerberos.user</name>
<value>flume/_HOST@REALM.COM </value>
<description></description>
</property>

Flume ノードが適切にセットアップされているかどうかは、次のコマンドで確認できます。

flume node_nowatch -1 -n dump -c 'dump: console |  collectorSink("hdfs://kerb-nn/user/flume/%Y%m%D-%H/","testkerb");'

コマンドを実行すると、コンソールに入力したデータが、Kerberos 化された HDFS の kerb-nn という NameNode の /user/flume/YYmmDD-HH/ ディレクトリに書き込まれるはずです。

データが書き込まれない場合、Flume の Hadoop 設定 (core-site.xml と hdfs-site.xml の設定) が、正しく Hadoop の設定を使っているかどうかをチェックする必要があります。

9.2. Kerberos での Flume ユーザーのセットアップ

[注記] 注記

ここで説明する手順は、MIT Kerberos 5 を対象としています。

Kerberos と HDFS と Flume を「適切にセットアップ」するには、いくつか要件があります。

  • 各マシン上に Flume ユーザー用のプリンシパルを用意する必要があります。
  • 各マシン上に」各プリンシパルのキーが収められた keytab を用意する必要があります。

このセットアップ作業の大部分は、 kdamin プログラムを使って行うことができ、 kinitkdestroy 、さらに klist プログラムを使って検証することができます。

9.2.1. Kerberos プリンシパルの管理

まず、 kadmin プログラムを使う権限を持っていて、KDC にプリンシパルを追加できる必要があります。

$ kadmin -p <adminuser> -w <password>

正しい値を指定してコマンドを実行すると、kadmin プロンプトが表示されます。

kadmin:

このプロンプトで、Flume プリンシパルを KDC に追加できます。

kadmin: addprinc flume
WARNING: no policy specified for flume@REALM.COM; defaulting to no policy
Enter password for principal "flume@REALM.COM":
Re-enter password for principal "flume@REALM.COM":
Principal "flume@REALM.COM" created.
kadmin:

HDFS に直接書き込みを行う各 Flume ノードのホストを含むプリンシパルも追加する必要があります。キーは keytab ファイルにエクスポートするので、-randkey オプションを使ってランダムキーを生成できます。

kadmin: addprinc -randkey flume/host.com
WARNING: no policy specified for flume/host.com@REALM.COM; defaulting to no policy
Principal "flume/host.com@REALM.COM" created.
kadmin:
[注記] 注記

Hadoop の Kerberos 実装では、user/host@REALM.COM という 3 つの部分からなるプリンシパルが必要です。通常、ユーザーとして必要なのは、user@REALM.COM 形式のユーザー名だけです。

kinit プログラムを実行し、指定したパスワードを入力すれば、ユーザーが追加されていることを確認できます。また、 klist プログラムを実行すれば、TGT (Ticket Granting Ticket) がロードされていることを確認できます。

$ kinit flume/host.com
Password for flume/host.com@REALM.COM:
$ klist
Ticket cache: FILE:/tmp/krb5cc_1016
Default principal: flume/host.com@REALM

Valid starting     Expires            Service principal
09/02/10 18:59:38  09/03/10 18:59:38  krbtgt/REALM.COM@REALM.COM


Kerberos 4 ticket cache: /tmp/tkt1016
klist: You have no tickets cached
$

Kerberos 4 に関する情報は無視してください。"ログアウト" するには、 kdestroy コマンドを使います。 klist を実行すると、クレデンシャルが破棄されていることを確認できます。

$ kdestroy
$ klist
klist: No credentials cache found (ticket cache FILE:/tmp/krb5cc_1016)


Kerberos 4 ticket cache: /tmp/tkt1016
klist: You have no tickets cached
$

次に、パスワードの入力を不要にして自動的にログインできるよう、keytab ファイルを作成します。

[警告] 警告

keytab ファイルには秘密のクレデンシャルが収められているので、このファイルを保護して適切なユーザーだけが読み取ることができるようにする必要があります。ファイルを作成したら、パーミッションを 0400 (-r--------) に変更し、Flume を実行するユーザーをファイルのオーナーにする必要があります。

次に示すのは、(flume.keytab という名前の) keytab ファイルを生成し、このファイルにユーザー flume/host.com を追加する例です。

kadmin: ktadd -k flume.keytab flume/host.com
[注記] 注記

この手順を実行すると、flume/host.com は手動でログインすることができなくなります。ただし、keytab を使用せず、手動でログインできる Flume ユーザーを用意することはできます。

[警告] 警告

ktadd では、複数のプリンシパルの keytab エントリを 1 つのファイルに追加することができ、1 つの keytab ファイルは複数のキーを持つことができます。ただし、この点はセキュリティ上の弱点で、動作のおかしいマシンからクレデンシャルを取り消すことが困難となることがあります。このリスクの評価については、セキュリティ管理者とよく相談してください。

次のコマンドを実行すると、キーの名前とバージョン (KVNO) を確認することができます。

$ klist -Kk flume.keytab
Keytab name: FILE:flume.keytab
KVNO Principal
---- --------------------------------------------------------------------------
   5 flume/host.com@REALM.COM (0xaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa)
   5 flume/host.com@REALM.COM (0xbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb)
   5 flume/host.com@REALM.COM (0xcccccccccccccccc)
   5 flume/host.com@REALM.COM (0xdddddddddddddddd)

いくつかのエントリ、およびプリンシパル名に続いて、対応するキーが 16進表記で表示されるはずです。

flume@REALM.COM プリンシパルに対して kinit を使えば、対話的な Kerberos ログインを行い、Hadoop コマンドを使って HDFS をブラウズすることができます。

$ kinit flume
Password for flume@REALM.COM:  <--  パスワードを入力.
$ hadoop dfs -ls /user/flume/

10. 付録

10.1. Flume のソースのカタログ

Flume の階層化イベントソース. 以下のソースとシンクは、実際には元の指定から、より具体性の高い低水準の構成に変換されます。一般にこれらのソースとシンクは、properties xml ファイルまたはマスターによって割り当てられた適切なデフォルト引数を持っています。これらのデフォルトは、ユーザーが上書きできます。

collectorSource[(port)]

コレクタソース。ポート port に転送されてくる agentSink からのデータをリスンします。ポートが指定されなかった場合、ノードは、デフォルトの TCP ポートである 35853 を使用します。このソースは自身をマスターに登録し、フェイルオーバーチェインが自動的に決定されるようにします。

autoCollectorSource

自動コレクタソース。論理コレクタを作成します。作成された論理コレクタは、物理ノードに割り当てられると、フェイルオーバーチェインのコレクタのリストに含まれるようになります。このソースは、auto*Chain() シンクに対応するコレクタです。詳細については、「自動フェイルオーバーチェイン」の説明を参照してください。

logicalSource

論理ソース。このソースはマスターから割り当てられたポートを持ち、rpcSink 形式のデータをリスンします。

Flume の基本ソース. 以下のソースは変換されず、一般にすべての引数を必要とします。

null

Null ソースです。オープンとクローズがあり、next() の呼び出しに対しては null (最後のレコード) を返します。

console

標準入力コンソールソースです。対話ユーザーとしてイベントを入力するためのソースで、履歴編集やキーボード編集ショートカットなどの機能が用意されています。flume ノードは flume node_nowatch で起動しなければなりません。watchdog 機能が有効な場合、コンソール入力はできません。

stdin

標準入力ソースです。flume ノードの標準入力データソースにデータをパイプするためのソースです。flume ノードは flume node_nowatch で起動しなければなりません。watchdog 機能が有効な場合、コンソール入力はできません。警告: このソースは対話型のコンソールとして使用できますが、その場合、改行が入力されるまで flume ノードはハングします。

rpcSource(port)

TCP ポート port でリスンするよう構成されたリモートプロシージャコール (RPC) サーバーです。Apache-Thrift と Apache-Avro の両方の RPC フレームワークをサポートします。RPC フレームワークのタイプは、 event.rpc.type プロパティ (THRIFT または AVRO) で指定します。デフォルトは THRIFT です。rpcSink でも同じ RPC フレームワークが使われることに注意してください。

text("filename"[, format])

ワンタイムのテキストファイルソースです。\n で区切られた行が 1 つのイベントになります。

tail("filename"[, startFromEnd=false]{,delim="regex", delimMode="exclude|prev|next"})

Unix の tail ユーティリティに似たソースです。1 行が 1 つのイベント。指定されたファイル全体のイベントを生成し、その後、オープン状態のままデータが到着するのを待ち、ファイル名で指定されたファイルを追跡します (たとえば、"foo" というファイルを tail していて、"foo" の名前が "bar" に変更され、その後 "foo" という名前の新しいファイルが出現すると、新しい "bar" ファイルの読み取りを終了し、"foo" の先頭から読み取りを開始します)。startFromEnd パラメータに false を指定すると、tail はファイルの先頭から読み取り直します。true を指定すると、ファイルの現在の末尾から読み取りを開始します。ファイルの最後の行が改行文字 (\n) で終わっていない場合、 tail ソースは tail がクローズされたときにはじめて、この最後の行をイベントとして送信します。delimdelimMode の詳細については、ファイルの tail についての説明を参照してください。

multitail("filename"[, file2 [,file3 … ] ])

tail に似ていますが、複数のファイルを同時に追跡できます。

tailDir("dirname"[, fileregex=".*"[, startFromEnd=false[, recurseDepth=0]]]{,delim="regex", delimMode="exclude|prev|next"})

ディレクトリ dirname に存在し、指定された fileregex に一致するすべてのファイルの末尾を追跡します。regex 引数では \\" による Java スタイルのエスケープが必要であることに注意してください。たとえば、 \w+ は "\\w+" のように記述する必要があります。新しいファイルが出現すると、そのファイルは追跡対象のファイルのリストに追加されます。新しいディレクトリが指定された場合は、一致するすべてのファイルの読み取りを試みます。startFromEnd パラメータが false の場合、tail は各ファイルの先頭から読み取り直します。true を指定すると、各ファイルの現在の末尾から読み取りを開始します。recurseDepth パラメータが 0 より大きい場合、tailDir はサブディレクトリを再帰的に探索します。recurseDepth パラメータの値は、 dirname 以下で tail の対象となる最大ディレクトリレベルを示します。ゼロ (0) を指定すると、サブディレクトリの再帰的探索を行いません。注意: fileregex はファイル名だけに適用され (ディレクトリは除外されます)、(recurseDepth パラメータに一致する) すべてのディレクトリが再帰的に探索されます。delimdelimMode の詳細については、ファイルの tail についての説明を参照してください。

seqfile("filename")

Hadoop シーケンスファイル形式のファイルから com.cloudera.flume.handlers.hdfs.WriteableEventKey キーと com.cloudera.flume.handlers.hdfs.WriteableEvent の値を読み取ります。便宜上、このソースは、seqfile シンクによって生成されたファイルを読み取ることができます。

syslogUdp(port)

UDP ポート port でリスンする syslog。syslog 互換。

syslogTcp(port)

TCP ポート port でリスンする syslog。syslog-ng 互換。多数の並行接続をリスン、受信できるサーバーです。

syslogTcp1(port)

TCP ポート port でリスンする syslog。syslog-ng 互換。単一の接続に対してのみ使用できるこのソースで、その後はシャットダウンします。

execPeriodic("cmdline", ms)

cmdline で指定された任意のプログラムを実行します。実行したプログラムの出力全体が、生成されるメッセージの本体になります。ms には、次の実行 (および次のイベント) まで待機する時間をミリ秒単位で指定します。プログラムとして望ましいのは、生存期間の短いプログラムです。このソースは、シェルのパイプやリダイレクトの操作を処理しません。こうした処理が必要な場合は、スクリプトを書いて、このスクリプトを cmdline 引数で指定します。

execStream("cmdline")

cmdline で指定された任意のプログラムを実行します。出力の各行が 1 つの新しいイベントになります。プログラムとして望ましいのは、生存期間の長いプログラムです。このソースは、シェルのパイプやリダイレクトの操作を処理しません。こうした処理が必要な場合は、スクリプトを書いて、このスクリプトを cmdline 引数で指定します。

exec("cmdline"[, aggregate=false[, restart=false[,period=0]]])

cmdline で指定された任意のプログラムを実行します。aggregate 引数が true の場合、プログラムからの出力全体が 1 つのイベントとみなされます。それ以外の場合、各行が 1 つの新しいイベントとみなされます。restart 引数が true の場合、プログラムが終了してから period ミリ秒待機した後、プログラムは再実行されます。execStream("foo")exec("foo", false, false, 0) と等価です。execPeriodic("foo", 1000)exec("foo", true, true, 1000) と等価です。

synth(msgCount,msgSize)

サイズが msgSize のランダムなメッセージを msgCount 個だけ生成するソース。このソースは非印字可能文字を生成します。

synthrndsize(msgCount,minSize,maxSize)

minSizemaxSize の間のランダムなサイズのランダムなメッセージを msgCount 個だけ生成するソース。このソースは非印字可能文字を生成します。

nonlsynth(msgCount,msgSize)

サイズが msgSize のランダムなメッセージを msgCount 個だけ生成するソース。すべての '\n' 文字は ' ' 文字に変換されます。このソースは非印字可能文字を生成しますが、ランダムに生成されたすべての \n が変換されるので、レコードセパレータに \n を使うソースでは、サイズの統一されたデータを受け取ることができます。

asciisynth(msgCount,msgSize)

サイズが msgSize のランダムなメッセージを msgCount 個だけ生成するソース。すべての '\n' 文字は ' ' 文字に変換され、すべての非 ASCII 文字は印字可能な ASCII 文字に変換されます。

twitter("username","pw"[,"url"])

(サポート対象外) twitter "spritzer" ストリームからデータを収集するソースです。username は twitter ユーザー名、 pw は該当ユーザーのパスワード、 url はフィード用 URL です。URL が指定されなかった場合、デフォルトで http://stream.twitter.com/1/statuses/sample.jsonurl として使われます。詳細については、 http://apiwiki.twitter.com/Streaming-API-Documentation を参照してください。

irc("server",port, "nick","chan")

(サポート対象外) IRC チャンネルソース。チャンネルに送信された各行が新しいイベントになります。このソースは、 server の TCP ポート port (標準では 6667) に接続を試みます。接続すると、ニックネーム nick を使ってチャンネル chan (#hadoop など) への参加を試みます。

scribe[(+port)]

scribe ソース。Facebook の Scribe 収集システムによって生成されたデータと互換性のあるネットワークソケットを提供します。

report[(periodMillis)]

このソースは、毎 periodMillis ミリ秒ごとにローカル物理ノードをポーリングして報告がないかどうか調べ、報告を新しいイベントに変換します。ノードの報告ページに表示される属性名が含まれ、値は区切りのないバイト配列です。

10.2. Flume のシンクのカタログ

表6 Flume のコレクタ層イベントシンク

collectorSink("fsdir","fsfileprefix"[, rollmillis[, format]])

コレクタシンク。 fsdir は、 hdfs://namenode/path または file:///path などのファイルシステムディレクトリ URI です。fsfileprefix は、出力ファイルのファイル名プリフィックスです。これらのいずれに対しても、「 出力のバケット化 」で説明している、データをバケット化するためのエスケープシーケンスを使用できます。rollmillis には、HDFS ファイルをロール (オープンしてクローズする) 場合の間隔をミリ秒単位で指定します。コレクタによって出力されるデータの形式は、 flume.collector.output.format プロパティで指定します。


表7 Flume のエージェント層イベントシンク

agentSink[("machine"[, port])]

デフォルトは agentE2ESink です。

agentE2ESink[("machine"[, port])]

先行書き込みログ (WAL) とエンドツーエンドの受信確認を使用するエージェントシンク。省略可能な引数では、 collectorSource を示す machine と TCP ポート port を指定します。どちらも指定されなかった場合、 flume.collector.event.host プロパティと flume.collector.port プロパティで指定された値が使われます。

agentDFOSink[("machine"[, port])]

エラー検出時にデータをローカルディスクに保存するディスクフェイルオーバーエージェントシンク。このシンクは machine:port を一定時間ごとにチェックし、 machine:port が復旧した場合にはイベントを再送信します。省略可能な引数では、 collectorSource を示す machine と TCP ポート port を指定します。どちらも指定されなかった場合、 flume.collector.event.host プロパティと flume.collector.port プロパティで指定された値が使われます。

agentBESink[("machine"[, port])]

ベストエフォートエージェントシンク。このソースは障害発生時にはメッセージを破棄し、送信を続行します。省略可能な引数では、 collectorSource を示す machine と TCP ポート port を指定します。どちらも指定されなかった場合、 flume.collector.event.host プロパティと flume.collector.port プロパティで指定された値が使われます。ed.

agentE2EChain("m1[:_p1_]"[, "m2[:_p2_]"[,…]])

先行書き込みログ (WAL) とエンドツーエンドの受信確認、およびコレクタフェイルオーバーチェインを使用するエージェントシンク。m1:p1 には、プライマリデフォルトコレクタのマシンとポート (ポートは省略可能) を指定します。障害によってすべてのフェイルオーバーが使い果たされた場合、データはすでにローカルに永続化可能になっているので、このシンクは下流へのデータ送信の試みをバックオフします。省略可能な引数では、フェイルオーバーマシン:ポートのペアのリストをランク順に指定します。プライマリコレクタが応答しない場合には、バックアップコレクタが使われます。プライマリコレクタが復帰したかどうかは一定時間ごとにチェックされます。

agentDFOChain("m1[:_p1_]"[, "m2[:_p2_]"[,…]])

まずほかのコレクタへのフェイルオーバーを試みるディスクフェイルオーバーエージェントシンク。m1:p1 には、プライマリデフォルトコレクタのマシンとポート (ポートは省略可能) を指定します。障害によってすべてのフェイルオーバーが使い果たされた場合、このシンクはデータをローカルディスクに保存します。省略可能な引数では、フェイルオーバーマシン:ポートのペアのリストをランク順に指定します。プライマリコレクタが応答しない場合には、バックアップコレクタが使われます。プライマリコレクタが復帰したかどうかは一定時間ごとにチェックされます。

agentBEChain("m1[:_p1_]"[, "m2[:_p2_]"[,…]])

コレクタフェイルオーバーチェインを使用するベストエフォートエージェントシンク。m1:p1 には、プライマリデフォルトコレクタのマシンとポート (ポートは省略可能) を指定します。障害によってすべてのフェイルオーバーが使い果たされた場合、このシンクはデータを破棄します。省略可能な引数では、collector とコレクタの TCP ポート port を指定します。どちらも指定されなかった場合、 flume.collector.event.host プロパティと flume.collector.port プロパティで指定された値が使われます。

autoE2EChain

このシンクは、マスターによって自動的に構成されたフェイルオーバーノードを持つ agentE2EChain です。

autoDFOChain

このシンクは、マスターによって自動的に構成されたフェイルオーバーノードを持つ agentDFOChain です。

autoBEChain

このシンクは、マスターによって自動的に構成されたフェイルオーバーノードを持つ agentBEChain です。


Flume の論理シンク

logicalSink("logicalnode")
このシンクは、論理ノード名に基づいてホストと IP を割り当てられた rpcSink を作成します。ホストと IP の情報はマスターによって保守され、マスターによって自動的に選択されます。

表8 Flume の基本シンク

null

Null シンクです。イベントは破棄されます。

console[(format)]

console シンクです。指定された出力形式 format (省略可能) を使って、プロセスの標準出力にイベントを表示します。

text("txtfile"[,format])

テキストファイルシンクです。指定された出力形式 format (省略可能) を使って、テキストファイル txtfile にイベントを書き込みます。ファイルがすでに存在する場合、このシンクはファイルを上書きしようとします。

seqfile("filename")

seqfile シンクです。Hadoop シーケンスファイル形式のファイルに、com.cloudera.flume.handlers.hdfs.WriteableEventKey キーと com.cloudera.flume.handlers.hdfs.WriteableEvent の値を書き込みます。ファイルがすでに存在する場合、このシンクはファイルを上書きしようとします。

dfs("hdfspath")

Hadoop dfs seqfile シンクです。dfs パスに Flume 固有の Hadoop seqfile 形式で書き込みます。hdfspath では、「 出力のバケット化 」で説明している、データをバケット化するためのエスケープシーケンスを使用できます。

formatDfs("hdfspath"[, format])

Hadoop dfs 形式ファイルシンクです。hdfspath 文字列はエスケープ されません 。dfs パスへの書き込み形式には、指定された format が使われます。

escapedFormatDfs("hdfspath", "file"[, format])

Hadoop dfs 形式ファイルシンクです。hdfspath はエスケープされ、イベントはこの文字列に基づいて特定のディレクトリとファイル名に書き込まれます。dfs パスへの書き込み形式には、指定された format が使われます。hdfspath では、「 出力のバケット化 」で説明している、データをバケット化するためのエスケープシーケンスを使用できます。

customdfs("hdfspath"[, format])

Hadoop dfs 形式ファイルシンクです。hdfspath 文字列はエスケープ されません 。dfs パスへの書き込み形式には、指定された format が使われます。このシンクは推奨されません。代わりに formatDfs を使ってください。

escapedCustomDfs("hdfspath", "file"[, format])

Hadoop dfs 形式ファイルシンクです。hdfspath はエスケープされ、イベントはこの文字列に基づいて特定のディレクトリとファイル名に書き込まれます。dfs パスへの書き込み形式には、指定された format が使われます。hdfspath では、「 出力のバケット化 」で説明している、データをバケット化するためのエスケープシーケンスを使用できます。このシンクは推奨されません。代わりに escapedFormatDfs を使ってください。

rpcSink("host"[, port])

マシン host の TCP ポート port にイベントを送信するよう構成されたリモートプロシージャコール (RPC) シンクです。デフォルトポートは 35861 ですが、 flume.collector.event.port プロパティで上書きできます。Apache-Thrift と Apache-Avro の両方の RPC フレームワークをサポートします。RPC フレームワークのタイプは、 event.rpc.type プロパティ (THRIFT または AVRO) で指定します。デフォルトは THRIFT です。rpcSource でも同じ RPC フレームワークが使われることに注意してください。

syslogTcp("host"[,port])

syslog TCP シンク。ホスト host のポート port に syslog over TCP 形式 (syslog-ng 互換) でデータを書き込みます。デフォルトポートは TCP 514 です。

irc("host",port, "nick", "chan")

(サポート対象外) IRC チャンネルシンク。各イベントが 1 行でチャンネルに送信されます。このシンクは、 server の TCP ポート port に接続を試みます。接続すると、ニックネーム nick を使ってチャンネル chan (#hadoop など) への参加を試みます。


10.3. Flume のシンクデコレータのカタログ

表9 Flume のシンクデコレータ

nullDeco

子のシンクにそのままデータを渡すデコレータです。

writeAhead(…)

先行書き込みデコレータです。送信前にイベントをディスクに書き込むことによって永続性を提供します。このデコレータはバッファリング機構として使用することができ、その場合、受信と送信は異なるスレッドに分離されます。

ackedWriteAhead[(maxmillis)]

イベントに受信確認タグとチェックサムを追加する先行書き込みデコレータです。送信前にイベントをディスクに書き込むことによって永続性を提供します。このデコレータはバッファリング機構として使用することができ、その場合、受信と送信は異なるスレッドに分離されます。このデコレータはイベントのグループを生成して追跡するとともに、ほかのコンポーネントにも通知して受信確認をチェックします。再試行するかどうかのチェックは、maxmillis ミリ秒に達する指数バックオフがある場合に行われます。maxmillis のデフォルト値には flume.agent.logdir.maxage プロパティの値が使われます。

diskFailover[(maxmillis)]

ディスクフェイルオーバーデコレータです。このデコレータに入るイベントは、このデコレータのサブシンクに送信されます。下流でエラーが発生した場合、データはディスクに書き込まれ、これらのディスクバッファリングされたイベントの送信が一定時間ごとに再試行されます。再試行するかどうかのチェックは、maxmillis ミリ秒に達する指数バックオフがある場合に行われます。maxmillis のデフォルト値には flume.agent.logdir.maxage プロパティの値が使われます。

ackInjector

このデコレータは、オープン時に追加の ack グループスタートメッセージを注入し、追記されたイベントに ack タグを付け、追加の ack グループエンドメッセージを注入します。これらのタグには、 ackInjector を通るイベントの本体すべてを対象としたチェックサムが含まれています。

ackChecker

このデコレータは、 ackInjector によって挿入された ack グループのスタート、エンド、およびチェックサムの値を追跡します。グループが到着し、そのチェックサムが正しい場合、このデコレータはほかのコンポーネントに通知を送信します。

lazyOpen

このデコレータは、サブシンクのオープン/クローズ状態を追跡しますが、append が呼び出されるまで、実際にシンクをオープンすることはしません。したがって、このデコレータが append なしにオープンしてクローズされた場合、サブシンクは一度もオープンされません。

insistentOpen[(max[init[,cumulativeMax]],)]

insistentOpen デコレータは、指定されたバックオフのプロパティに従って成功するまで、サブシンクのオープンを何度も試みます。ネットワークサーバーがまだ稼働していない状態でネットワーククライアントを起動するときに便利です。サブシンクのオープンの試みが失敗すると、指数バックオフしてからオープンを再試行します。max は、バックオフごとの最大ミリ秒です (デフォルトは Integer.MAX_VALUE)。init は最初に遭遇したエラーでバックオフするミリ秒の初期値です (デフォルトは 1000)。cumulativeMax は 1 回のエラーで許容される最大バックオフで、これを超えると例外が転送されます (デフォルトは Integer.MAX_VALUE)。このデコレータは、 cumulativeMax ミリ秒後にオープンが成功または失敗するまで、オープンの呼び出しを同期的にブロックすることに注意してください。

stubbornAppend

stubbornAppend デコレータは、通常は append 操作をサブシンクにそのまま渡します。このデコレータは、サブシンクの append メソッドによってトリガされた最初の例外をキャッチすると、サブシンクをクローズしてオープンし、サブシンクへの append 操作をもう一度実行します。2 度目の試みが失敗すると、例外をスローします。このデコレータは、接続が切断されることのあるネットワークシンクと組み合わせて使うと便利です。オープン/クローズの再試行は、接続を再確立するのに十分なことも多いからです。

value("attr","value"{,escape=true|false})

value デコレータは、新しいメタデータ属性 attr を値 value で追加します。エージェントは、あとで分離する目的で、データに特定のタグでマークを付けておくことができます。デフォルトでは、ユーザーが入力した値が添付されます。escape=true を指定すると、値は解釈され、エスケープシーケンスをイベントの属性リストの値で置き換えようとします。

mask("attr1"[,"attr2", …])

mask デコレータは、指定された属性 以外 のすべてのメタデータを通すよう、入力イベントを修正して出力します。

select("attr1"[,"attr2", …])

select デコレータは、指定されたメタデータ属性 だけ を通すよう、入力イベントを修正して出力します。

digest("algorithm","attr", base64="boolean")

digest デコレータは、イベント本体のメッセージダイジェストを計算し、この値を (バイトとして) attr 属性に書き込みます。有効なアルゴリズムは、 java.security.MessageDigest で有効なアルゴリズムで、MD5、SHA-1、SHA-256、SHA-384、および SHA-512 が含まれます。オプションでダイジェスト値を Base64 でエンコードすることができます (デフォルトは false)。

format("pattern")

format デコレータは、エスケープされたバージョンの pattern 引数で本体を置き換え、入力イベントを修正して出力します。チェックサムは本体のデータを基にしているので、このデコレータは、信頼性のないフローや報告を行うフローでのみ使用するべきです。不適切な使い方をすると、メッセージが失われることがあります。

exDate("attr","pattern" [, "prefix" [, "padding"]])

イベントの attr 値から日付と時刻の文字列を解釈します。このとき、pattern を解釈方法の基準として使用します。prefix が指定されなかった場合、日付と時刻の主な値は、"date" をプリフィックスに使って新しいイベント属性に割り当てられます。ゼロのパディングを行うかどうかを指定できる padding 変数が用意されています。デフォルトでは、ゼロのパディングは有効になっています。たとえば、2011-1-1 に対してゼロをパディングすると、2010-01-01 になります。pattern の構築方法の詳細については、 http://download-llnw.oracle.com/javase/1.4.2/docs/api/java/text/SimpleDateFormat.html を参照してください。 attr は、イベントの本体からすでに取り出された日付と時刻の文字列であることに注意してください。exDate がこの文字列をユーザーに代わって本体から取り出すことはありません。出力属性の例は、dateday、datemonth、dateyear、datehr、datemin、datesec などです。ここで、 dateprefix です。

regex("regex",idx,"attr")

regex デコレータは、正規表現 regex を適用し、 idx 番目のキャプチャグループを取り出し、この値を attr 属性に書き込みます。regex では、Java スタイルのエスケープを使用しなければなりません。したがって、正規表現で \d マクロを使う場合は、 "\\d" のようにエスケープして指定する必要があります。

regexAll("regex", "name" [, "name"]*)

正規表現 regex をイベント本体に適用し、見つかったすべてのパターングループを、指定された各 name に割り当てます。

split("regex",idx,"attr")

split デコレータは、正規表現 regex を使って本体をトークン (スプリッタの値は含みません) に分割します。ついで、 idxattr 属性の値として書き込まれます。regex では、Java スタイルのエスケープを使用しなければなりません。したがって、正規表現で \d マクロを使う場合は、 "\\d" のようにエスケープして指定する必要があります。

batch(n,maxlatency)

n 個のイベントをバッファリングし、1 つの集約イベントを送信します。maxlatency ミリ秒が経過すると、現在バッファリングされているすべてのイベントが 1 つの集約イベントとして送信されます。

unbatch

unbatch は、 batch によって生成された集約イベントを受け取り、これを分割して元のイベントを転送します。集約イベントでないイベントは、そのまま転送されます。

gzip

直列化されたイベントを gzip で圧縮します。集約イベントとともに使うと便利です。

gunzip

gzip で圧縮されたイベントを展開します。gzip イベントでないイベントは、そのまま転送されます。

intervalSampler(n)

インターバルサンプラです。イベントは n 番目ごとに転送されます。

probSampler(p)

確率 (probability) サンプラです。各イベントは確率 p (0.0 ≤ p ≤ 1.0) で転送されます。

reservoirSampler(k)

リザーバ (reservoir) サンプラです。フラッシュされると、最大 k 個のイベントが転送されます。k 個以上のイベントがデコレータに入っている場合、正確に k 個のイベントが転送されます。選択される確率は、このデコレータを通るすべてのイベントで同じです。注意: このデコレータは、送信されるイベントの順序を変えます。

delay(ms)

パイプラインの下流にイベントを転送する前に ms ミリ秒の遅延を追加します。このデコレータは、ほかのイベントがパイプラインに入るのをブロックして妨げます。asciisynth ソースとともに使って作業負荷のシミュレーションを行うときに便利です。

choke[(choke-id)]

シンクに入るデータの転送レートを制限します。choke-id は、このデコレータが作成される物理ノード上で setChokeLimit コマンドを使ってあらかじめ登録しておく必要があります。詳細については、「 ソース/シンク・ペア間でのデータ転送レートの制限 」を参照してください。


10.4. Flume の環境変数

ここでは、Flume の動作に影響を与えるいくつかの環境変数について説明します。./bin 内の flume スクリプトは、これらの環境変数を使用します。これらの環境変数の多くは、Flume がデーモンとして実行されるときに使われる flume-daemon.sh スクリプトによって設定されます。

FLUME_PID_DIR

Flume ノードまたは Flume マスターが、デーモンプロセスに対応する pid ファイルを置くディレクトリ。

FLUME_CLASSPATH

Flume の実行時に追加したい独自の Java クラスパス環境変数。値は、通常の Flume が生成する CLASSPATH の前に追加されます。警告: flume スクリプトが実行されると、標準の CLASSPATH は flume スクリプトによって上書きされます。

FLUME_LOG_DIR

Flume ノードまたは Flume マスターによって生成されるデバッグログが書き込まれるディレクトリ。

FLUME_LOGFILE

Flume ノードまたは Flume マスターによって生成されるログファイルのサフィックスを設定します。

FLUME_ROOT_LOGGER

実行コマンドの log4j ロギング設定。デフォルトは "INFO,console"。

ZOOKEEPER_ROOT_LOGGER

マスターの組み込み ZooKeeper サーバーのログの log4j ロギング設定。デフォルトは "ERROR,console"。

WATCHDOOG_ROOT_LOGGER

Flume ノードと Flume マスターをラップする watchdog が生成するログの log4j ロギング設定。デフォルトは "INFO,console"。

FLUME_CONF_DIR

Flume ノードと Flume マスターが使用する flume-site.xml ファイルと flume-conf.xml ファイルが存在するディレクトリ。デフォルトは、 ./conf/flume-conf.xml ファイルが見つかった場合は ./conf 、さもなければ flume-conf.xml/etc/flume/conf/ で見つかった場合は /etc/flume/conf/

HADOOP_HOME

Hadoop jar が存在することが期待されるディレクトリ。指定されなければ、 /usr/lib/hadoop または ./lib/ で見つかった jar を使用します。

FLUME_DEVMODE

この値に "true" が設定されている場合、JSP サーブレットをコンパイルするのに必要な ant jar を含む ./libbuild jar が、CLASSPATH に含まれるようになります。

FLUME_VERBOSE

有効にされると、flume スクリプトは、実行するコマンド行を表示します。

FLUME_VERBOSE_JAVA

FLUME_VERBOSE とともに有効にされると、Flume を実行する JVM に "-verbose" フラグが渡されます。

LD_LIBRARY_PATH

Flume の java.library.path を拡張するのに使うファイルパスをコロンで区切って並べたリスト。この環境変数を使うと、java の参照パスにネイティブライブラリを含めることができます。実行する Hadoop に含まれている Lzo C ライブラリをパスに加える必要がある場合などに使用します。

10.5. flume-site.xml 構成設定

10.6. トラブルシューティング

10.6.1. デフォルトポートはどうなっていますか?

いずれの場合も TCP ポートが使われます。

ノードコレクタポート

flume.collector.port

35853+

ノードステータス web サーバー

flume.node.http.port

35862+

マスターステータス web サーバー

flume.master.http.port

35871

マスターハートビートポート

flume.master.heartbeat.port

35872

マスター admin/shell ポート

flume.master.admin.port

35873

マスター gossip ポート

flume.master.gossip.port

57890

マスター報告ポート

flume.report.server.port

45678

マスター → zk ポート

flume.master.zk.client.port

3181

zk → zk クォーラムポート

flume.master.zk.server.quorum.port

3182

zk → zk 選挙用ポート

flume.master.zk.server.election.port

3183

コレクタでオートチェインを使用する場合、各コレクタソースは、ポート flume.collector.port を起点にポート番号をインクリメントして、空いているポートを自動的に探します。

1 台のマシン上に複数の物理ノードが存在する場合、ノードステータス Web サーバーはポート flume.node.http.port にバインドを試みます。バインドが失敗した場合は、空いているポートが見つかるまで、ポート番号がインクリメントされてバインドが再試行されます。そして、最終的にバインドされたポートが記録されます。

10.6.2. どのバージョンの Hadoop HDFS を使用できますか? 使用するバージョンを変更するにはどうすればよいですか?

現在のところ、HDFS への書き込みに関していくつか制約があります。具体的には、Flume ノードが書き込むことができる Hadoop のバージョンは 1 つだけです。Hadoop の HDFS API はかなり安定していますが、HDFS クライアントの互換性が保証されているのは、同一バージョンの HDFS に対してだけです。Cloudera のテストでは、Hadoop HDFS 0.20.x と HDFS 0.18.x を使用しました。これらは API 互換なので、バージョンを切り替えるのに必要なことは、Hadoop jar を取り替え、別のバージョンの Hadoop に書き込むノードを再起動することだけです。

ただし、このインスタンスの Hadoop を構成して、適切な HDFS NameNode と対話できるようにする必要があります。Hadoop クライアントの設定 (NameNode へのポインタなど) は、Hadoop DataNode やワーカーノードと同じ方法で構成します。すなわち、 conf/core-site.xml を修正して使用します。

デフォルトでは、Flume は /usr/lib/hadoop に Hadoop jar があるかどうかチェックします。この場所に存在しない場合、自身の lib ディレクトリである /usr/lib/flume/lib で見つかった jar をデフォルトで使用します。

10.6.3. Flume ノードが Flume マスターに表示されないのはなぜですか?

マスターにノードが表示されない場合は、ノードが実行されているかどうかをまずチェックする必要があります。

# jps | grep FlumeNode

/usr/lib/flume/logs/flume-flume-node*.log のログもチェックしてください。

よくあるエラーは、マスターにコンタクトできないというエラーメッセージや警告です。マスターにコンタクトできない原因として、ホストの構成ミス、ポートの構成ミス (35872 がデフォルトのハートビートポートです)、ファイアウォールの問題などがあります。

その他のエラーとして考えられるのは、ローカルマシンの先行書き込みログ (WAL) ディレクトリのパーミッションの問題です。このディレクトリは、セットアップ直後では /tmp/flume/agent ディレクトリです。Flume ノードを flume 以外のユーザーで実行したことがある場合 (特に root で実行した場合)、ディレクトリを削除するか、またはディレクトリの中身のパーミッションを変更して、Flume が使用できるようにする必要があります。

10.6.4. Flume ノードの状態がすぐに変わるのはなぜですか?

ノードはデフォルトでは hostname を自分の物理ノード名として起動し、選択します。物理ノード名は一意であることが前提になっています。複数の物理ノードに同じ名前が割り当てられていると、予期しない結果が生じることがあります。

10.6.5. Flume 自体のトラブルシューティングに役立つログはどこにありますか?

Ubuntu に Flume をインストールした場合、ログは /usr/lib/logs/ に書き込まれます。

マスターのログ
/usr/lib/logs/flume-flume-master-host.log
マスターの標準出力
/usr/lib/logs/flume-flume-master-host.out.*
ノードのログ
/usr/lib/logs/flume-flume-node-host.log
ノードの標準出力
/usr/lib/logs/flume-flume-node-host.out.*

10.6.6. ファイルハンドルを使い果たしたためにノードエラーが発生した場合、どう対処すればよいですか?

Linux には、オープンできるファイルの最大数 (328838) と、1 ユーザーがオープンできるファイルの最大数 (デフォルトは 1024) の 2 つの制限があります。ソケットはファイルハンドルなので、オープンできる TCP 接続の数もこの制限を受けます。

Ubuntu では、次の行を /etc/security/limits.conf に追加する必要があります。

<user> hard nofile 10000

該当ユーザーは、次の行を ~/.bash_profile に追加して、制限をハードリミットまで引き上げてやる必要があります。

ulimit -n 10000

10.6.7. ディスクフェイルオーバーまたはログ先行書き込み (WAL) 使用時のエラー

現在のところ、Flume のロギングメカニズムはファイルシステムに依存しています。したがって、Flume を実行しているユーザーが、指定されたロギングディレクトリへの書き込み権限を持っていることを確認しなければなりません。

現在のデフォルトでは、/tmp/flume に書き込むようになっています。実働システムでは、リブート時に自動的に削除されることのないディレクトリを書き込み先に指定する必要があります。

10.6.8. Amazon S3 にデータを書き込むことはできますか?

できます。コレクタシンクや dfs シンクで、プリフィックス s3n:// または s3:// を使えば、S3 バケットに書き込むことができます。

まず、s3 への書き込みをサポートするために、いくつかの jar を CLASSPATH に追加しなければなりません。次に示すのは、Cloudera がテストした jar のセットです (ほかのバージョンでも動作する可能性は高いと思われます)。

  • commons-codec-1.3.jar
  • commons-httpclient-3.0.1.jar
  • jets3t-0.6.1.jar

s3n はネイティブ s3 ファイルシステムを使っており、個々のファイルのサイズに関して制限があります。この方法で書き込まれたファイルは、 s3cmd などのほかのプログラムとも互換性があります。

オーバーレイファイルシステムを使った s3 への書き込みは、ファイルシステムとの対話のために Hadoop を経由する必要があります。

用語集

エージェント
フローのスタート地点に位置する Flume ノードで、外部ソースからのデータをキャプチャし、下流に送る役割を果たします。
コレクタ
フローの終端に位置する Flume ノードで、データをその最終目的地に配信します。
フロー
順番につながった一連のノードで、これらが全体で単一のソースからのデータを最終目的地まで加工して配信します。
マスター
すべてのノードの構成を管理するサービスで、すべてのノードはこのサービスに対して報告を行います。
シンク
すべての処理を終えた後にノードがデータを送る場所です。
ソース
ノードがデータストリームを取得する場所です。

11. バージョン

11.1. 履歴

v0.9.3 2011/2/4
マルチマスターが動作。Microsoft Windows のサポート。JSON サポートを含むメトリクスフレームワーク。ソース/シンク API を更新。変換されないエージェントチェインと複数の論理ノード構成の堅牢性を改善。構成言語構造を更新。ユーザビリティの改善。
v0.9.2 2010/11/15
Hadoop でサポートされているすべての圧縮コーデックをサポート。Avro RPC サポート。柔軟なスループットスロットリング。シェルに有益なエラーメッセージを追加。Thrift RPC サポートのための各種更新。tail と exec でのパフォーマンスと堅牢性を改善。
v0.9.1u1 2010/10/12
Kerberos 化された HDFS への書き込みサポートを追加。Flume クックブック。
v0.9.1 2010/8/9
エラーメッセージとプロパティ構成値の可視性の向上。外部からの最初のコントリビュート。再構成時のハングを修正。プラグイン実装に関するドキュメントを充実。scribe および syslog サポートを更新。出力ファイルの圧縮。
v0.9 2010/6/29
メトリクスおよび報告のフレームワーク、論理ノードと論理名による抽象化、フローごとの WAL/DFO 分離、変換ベースの高水準シンク。オープンソース化と github への最初のプッシュ。
v0.3 2010/3/31
ZooKeeper ベースのマスター/マルチマスター、データプレーンとコントロールプレーンの自動フェイルオーバー。flume シェル。deb/rpm パッケージの作成。
v0.2 2010/1/21
各種の信頼性モード: WAL 2.0、ディスクフェイルオーバー、ベストエフォート。出力ファイルでのエスケープシーケンス/出力のバケット化。シンクとデコレータを多数追加。
v0.1 2009/11/23
最初のインストールデプロイ、ユーザーテスト。
v0.0 2009/9/21
現行アーキテクチャの最初のバージョン (集中化マスター、構成言語、Web インタフェース)。WAL の最初のバージョン。シンプルな可視化、サンプラ、Thrift ベースの rpc。
有史以前 2009/7/21

最初のコミット。設計、実験的実装。最初の実装では、個別のエージェントプログラムとコレクタプログラム、watchdog があった。

     ______
    / ___//_  ______  ____
   / /_/ / / / /    \/ __/
  / __/ / /_/ / / / / __/
 / / /_/\____/_/_/_/\__/
/_/ Distributed Log Collection.