2016年5月10日火曜日

Apache NiFi 初めて使う

Apache nifi


Windows 
 run-nifi.batをダブルクリックするだけ
Linux/OSX
 bin/nifi.sh startで起動しよう。Apacheの様に。サービスとして起動することもできるよ。

君のパソコンにサーバが立ってるから
http://localhost:8080/nifi
でアクセス
してみよう、こんな画面が出るよ、これをcanvasと呼んでいるらしい
早速dataflowの元になるprocessorを作ってみましょう
の左上にあるリサイクルマークみたいなのを、画面上にドラッグしる
「add processor」と出るので、ここではおとなしく言われた通りに検索窓に"local”と入れてナロウダウン
Getfileを選んでみよう



これが初プロセッサー
右クリックして"configure"を選ぼう
ディレクトリの設定を終えて、続いてLogAttributeプロセッサーを同じ手順で召喚する。

二つ並んだよ。Getfileプロセサにカーソルを持ってくると
こんなアイコンが出るから、これをLogAttributeプロセサに持って行こう。
こんな線で繋がるよ
ここでConnectionの設定ができるよ



ここまでで、こんな画面になるはずだよ
しかし、LogAttributeは"The LogAttribute Processor, however, is now invalid because its successRelationship has not been connected to anything"というエラーが出ているはず。
LogAttributeに行って、何もわからなくてもAuto terminate relationshipsの"success"をチェックしよう
これでデータを流す準備ができたよ

右クリックでスタートしてみよう!


・でこんなことができるの?
データ変換系
CompressContent: コンテンツの圧縮解凍
ConvertCharacterSet: 文字エンコード変換
EncryptContent: コンテンツの暗号化復号化
ReplaceText: 正規表現でテキストコンテンツを変換
TransformXml: Apply an XMLコンテンツにXSLTを適用


ルーティングやフロー調整
ControlRate: フローのデータ量をレートで調整
DetectDuplicate: クライテリアにしたがって重複したフローファイルを検知。HashContentと併用してみよう。
DistributeLoad: ラウンドロビン等データのロードバランシングを行う。
MonitorActivity: 一定時間来たらノーティフィケーションしたりする。
RouteOnAttribute: フローファイルのルーティングをする
ScanAttribute: ユーザーのディクショナリにしたがってフローファイルの属性をスキャン
RouteOnContent: 正規表現にしたがってフローファイルを検索、分岐
ScanContent: フローファイル中のコンテンツに特定語句が含まれるか検索。(テキストもバイナリもいけるらしい)
ValidateXml: XMLスキーマにしたがったXMLコンテンツのバリデーション。


データベースアクセス
ConvertJSONToSQL: JSONドキュメントをPutSQLプロセサが理解できるようなSQLのインサート、アップデート文に変換
ExecuteSQL: SQL文を実行、結果はAvroフォーマットで
PutSQL: フローファイルで指定した内容にしたがってデータベースを更新

属性展開
EvaluateJsonPath: JSONPath表現にしたがってJSONコンテンツを評価
EvaluateXPath: XpathにしたがってXMLコンテンツを評価
EvaluateXQuery: Xクエリ(以下略)
ExtractText: 正規表現(
HashAttribute: concatenation(connectionの事?)にしたがって属性のハッシュ化
HashContent: フローファイルにしたがってハッシュ値を属性で管理
IdentifyMimeType: MIMEでファイル判定
UpdateAttribute: フローファイルの属性を追加変更。統計データに使うとか、オリジナル属性を作るとか


システム制御
ExecuteProcess: OSのコマンド動かす。プロセスのStdOut(標準出力は)出力フローファイルになる。入力を使ったコマンドにはExecuteStreamCommandで対応しよう。
ExecuteStreamCommand: 標準入力(StdIn)にしたがってOSのコナン度を動かす。ソースプロセッサーとして使いたいならExecuteProcessを読もう


データ入力闘魂注入
GetFile: ローカルディスク(とかネットワークディスク)からNifiにファイルを移動。※このプロセッサーはコピーでなく移動を目的としている(元のファイルは削除される)
GetFTP: FTPからNifiへファイルダウンロード。※このプロセッサーはコピーでなく(以下略
GetSFTP: SFTPから(
GetJMSQueue: JMSキューからメッセージをダウンロードして、JMSメッセージをベースにフローファイルを生成
GetJMSTopic: JMSトピックからメッセージダウンロードして、JMSメッセージをベースにフローファイルを生成
GetHTTP: HTTP経由でコンテンツをNiFiにゲット。
ListenHTTP: サーバとして外からのHTTPリクエストを待つ。
ListenUDP: UDPパケットを待つ
GetHDFS: HDFSをモニタリング。HDFSにファイルができたらNiFiにそれを取り込む
ListHDFS / FetchHDFS: HDFS中のディレクトリを監視しフローファイルをして送信
FetchS3Object: AmazonのS3にあるコンテンツにフェッチする。
GetKafka: Apache Kafkaからメッセージを取得。
GetMongo: MongoDBに対してクエリを実行し結果をFlowFileに
GetTwitter: Allows Users to register a filter to listen to the Twitter "garden hose" or Enterprise endpoint, creating a FlowFile for each tweet that is received.

データ放出 データ送信系
PutEmail: Eメール送信
PutFile: ローカルファイルにファイル出力
PutFTP: FTPサーバにファイルをコピー
PutSFTP: SFTP
PutJMS: JMSメッセージとしてフローファイルをおくる
PutSQL: FlowFile中のSQL DDLを実行
PutKafka: Kafcaにメッセージ送信
PutMongo: mongodbにフローファイルの内容をインサートまたはアップデート

分割と統合
SplitText: テキストを含んだFlowFileを分割します。
SplitJson: 配列やノード化してるJasonファイルを要素ごとにFlowFileに分割します
SplitXml: XMLメッセージを複数のFlowFileに分割します。
UnpackContent: ZIPやらTARの解凍して解凍したものを一つのFlowFileとして扱います。
MergeContent: 複数のFlowFileを一つにマージします。
SegmentContent: 並列データ送信などのためにデータの分割を行います。
SplitContent: FowFileを複数のファイルに分割します。

HTTP通信
GetHTTP: HTTPからGETでコンテンツ取得
ListenHTTP: サーバとして外からのHTTPリクエストを待つ。
InvokeHTTP: GetHTTPやPostHTTPより柔軟にHTTPリクエストを送信する。
PostHTTP: ListenHTTPと併用してNiFi間通信したりするためにPOSTでデータ送受信する。
HandleHttpRequest / HandleHttpResponse: 

AWS
FetchS3Object:S3にあるコンテンツにフェッチ
PutS3Object: FlowFileのコンテンツをS3に書き込み
PutSNS: Amason SNSにFlowFileのコンテンツを送信
GetSQS: SQSからデータを取得してFlowfileのコンテンツとして書き込み
PutSQS: SQSにFlowFileのコンテンツを送信
DeleteSQS: