Amazon Kinesis Data Streams
開発者ガイド

Kinesis エージェントを使用して Amazon Kinesis Data Streams への書き込み

Kinesis エージェントはスタンドアロンの Java ソフトウェアアプリケーションで、データを収集して Kinesis Data Streams に送信する簡単な方法を提供します。このエージェントは一連のファイルを継続的に監視し、新しいデータをストリームに送信します。エージェントはファイルのローテーション、チェックポイント、失敗時の再試行を処理します。タイムリーで信頼性の高い簡単な方法で、すべてのデータを提供します。また、ストリーミング処理のモニタリングとトラブルシューティングに役立つ Amazon CloudWatch メトリクスも出力します。

デフォルトでは、レコードは改行文字 ('\n') に基づいて各ファイルから解析されます。しかし、複数行レコードを解析するよう、エージェントを設定することもできます (「エージェントの設定」を参照)。

このエージェントは、ウェブサーバー、ログサーバーおよびデータベースサーバーなど、Linux ベースのサーバー環境にインストールできます。エージェントをインストールした後で、モニタリング用のファイルとデータストリームを指定して設定します。エージェントが設定されると、ファイルから永続的にデータを収集して、ストリームに安全にデータを送信します。

前提条件

  • オペレーティングシステムは Amazon Linux AMI バージョン 2015.09 以降、または Red Hat Enterprise Linux バージョン 7 以降でなければなりません。

  • Amazon EC2 を使用してエージェントを実行している場合は、EC2 インスタンスを起動します。

  • 次のいずれかの方法を使用して、AWS 認証情報を管理します。

    • EC2 インスタンスを起動する際に IAM ロールを指定します。

    • エージェントを設定する際に AWS 認証情報を指定します (awsAccessKeyId および awsSecretAccessKey を参照してください)。

    • /etc/sysconfig/aws-kinesis-agent を編集して、リージョンと AWS アクセスキーを指定します。

    • EC2 インスタンスが他の AWS アカウントにある場合、Kinesis Data Streams サービスへのアクセス権を提供する IAM ロールを作成し、エージェントを設定するときにそのロールを指定します (「assumeRoleARN」と「assumeRoleExternalId」を参照)。前のいずれかの方法を使用して、このロールを引き受ける権限がある、他のアカウントのユーザーの AWS 認証情報を指定します。

  • 指定した IAM ロールまたは AWS 認証情報は、エージェントがストリームにデータを送信するために、Kinesis Data Streams PutRecords 操作を実行する権限が必要です。エージェントの CloudWatch モニタリングを有効にしている場合は、CloudWatch PutMetricData 操作を実行する権限も必要になります。詳細については、「IAM により Amazon Kinesis Data Streams リソースへのアクセスを制御する」、「Amazon CloudWatch で Kinesis Data Streams エージェントの状態をモニタリングする」および「CloudWatch のアクセスコントロール」を参照してください。

エージェントのダウンロードとインストール

最初に、インスタンスに接続します。詳細については、『Linux インスタンス用 Amazon EC2 ユーザーガイド』の「インスタンスへの接続」を参照してください。接続できない場合は、『Linux インスタンス用 Amazon EC2 ユーザーガイド』の「インスタンスへの接続に関するトラブルシューティング」を参照してください。

Amazon Linux AMI を使用してエージェントを設定するには

次のコマンドを使用して、エージェントをダウンロードしてインストールします。

sudo yum install –y aws-kinesis-agent

Red Hat Enterprise Linux を使用してエージェントを設定する

次のコマンドを使用して、エージェントをダウンロードしてインストールします。

sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn1.noarch.rpm

GitHubを使用してエージェントを設定する

  1. awlabs/amazon-kinesis-agent」からエージェントをダウンロードしてください。

  2. ダウンロードしたディレクトリまで移動し、次のコマンドを実行してエージェントをインストールします。

    sudo ./setup --install

エージェントの設定と開始

エージェントを設定して開始するには

  1. (デフォルトのファイルアクセス権限を使用している場合、スーパーユーザーとして)設定ファイル (/etc/aws-kinesis/agent.json) を開き、編集します。

    この設定ファイルで、エージェントがデータを集めるファイル ("filePattern") とエージェントがデータを送信するストリーム ("kinesisStream") を指定します。ファイル名はパターンで、エージェントはファイルローテーションを確認する点に注意してください。1 秒あたりに一度だけ、ファイルを交替または新しいファイルを作成できます。エージェントはファイル作成タイムスタンプを使用して、どのファイルを追跡してストリームに送信するかを判断します。新規ファイルの作成やファイルの交換を 1 秒あたりに一度以上頻繁に交換すると、エージェントはそれらを適切に区別できません。

    { "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "yourkinesisstream" } ] }
  2. エージェントを手動で開始する:

    sudo service aws-kinesis-agent start
  3. (オプション) システムスタートアップ時にエージェントを開始するように設定します。

    sudo chkconfig aws-kinesis-agent on

エージェントは、システムのサービスとしてバックグラウンドで実行されます。継続的に指定ファイルをモニタリングし、指定されたストリームにデータを送信します。エージェント活動は、/var/log/aws-kinesis-agent/aws-kinesis-agent.log に記録されます。

エージェントの設定

エージェントは、2つの必須設定、filePatternkinesisStream、さらに追加機能として任意の設定をサポートしています。必須およびオプションの設定を /etc/aws-kinesis/agent.json で指定できます。

設定ファイルを変更した場合は、次のコマンドを使用してエージェントを停止および起動する必要があります。

sudo service aws-kinesis-agent stop sudo service aws-kinesis-agent start

または、次のコマンドを使用できます。

sudo service aws-kinesis-agent restart

全般設定は次のとおりです。

構成設定 説明
assumeRoleARN

ユーザーが引き受けるロールの ARN。詳細については、IAM ユーザーガイドの「IAM ロールを使用した アカウント間でのアクセスの委任」を参照してください。

assumeRoleExternalId

ロールを引き受けることができるユーザーを決定するオプションの ID。詳細については、IAM ユーザーガイドの「外部 ID を使用する方法」を参照してください。

awsAccessKeyId

デフォルトの認証情報を上書きする AWS アクセスキー IDです。この設定は、他のすべての認証情報プロバイダーに優先されます。

awsSecretAccessKey

デフォルトの認証情報を上書きする AWS シークレットキーです。この設定は、他のすべての認証情報プロバイダーに優先されます。

cloudwatch.emitMetrics

エージェントがメトリクスを CloudWatch に発行できるようにします (true に設定された場合)。

デフォルト: true

cloudwatch.endpoint

CloudWatch のリーションのエンドポイントです。

デフォルト: monitoring.us-east-1.amazonaws.com

kinesis.endpoint

Kinesis Data Streams のリーションのエンドポイントです。

デフォルト: kinesis.us-east-1.amazonaws.com

フロー設定は次のとおりです。

構成設定 説明
dataProcessingOptions

ストリームに送信される前に解析された各レコードに適用されるの処理オプションの一覧。処理オプションは指定した順序で実行されます。詳細については、「エージェントを使用してデータを事前処理する」を参照してください。

kinesisStream

[必須] ストリームの名前。

filePattern

[必須] エージェントによってモニタリングされる必要があるファイルの globこのパターンに一致するすべてのファイルは、エージェントによって自動的に選択され、モニタリングされます。このパターンに一致するすべてのファイルは、読み取り権限を aws-kinesis-agent-user に付与する必要があります。ファイルを含むディレクトリには、読み取りと実行権限を aws-kinesis-agent-user に付与する必要があります。

initialPosition

ファイルの解析が開始される最初の位置。有効な値は、START_OF_FILE および END_OF_FILE です。

デフォルト: END_OF_FILE

maxBufferAgeMillis

エージェントがストリームに送信する前にデータをバッファーする最大時間 (ミリ秒)。

値の範囲: 1,000 ~ 900,000(1 秒~ 15 分)

デフォルト: 60,000(1 分)

maxBufferSizeBytes

エージェントがストリームに送信する前にデータをバッファーする最大サイズ (バイト)。

値の範囲: 1 ~ 4,194,304(4 MB)

デフォルト: 4,194,304(4 MB)

maxBufferSizeRecords

エージェントがストリームに送信する前にデータをバッファーするレコードの最大数。

値の範囲: 1 ~ 500

デフォルト: 500

minTimeBetweenFilePollsMillis

エージェントが新しいデータのモニタリング対象ファイルをポーリングし、解析する時間間隔 (ミリ秒単位)。

値の範囲: 1 以上

デフォルト: 100

multiLineStartPattern

レコードの開始を識別するパターン。レコードは、パターンに一致する 1 行と、それに続くパターンに一致しない行で構成されます。有効な値は正規表現です。デフォルトでは、ログファイルのそれぞれの改行は 1 つのレコードとして解析されます。

partitionKeyOption

パーティションのキーを生成する方法。有効な値は RANDOM (ランダムに生成される整数) と DETERMINISTIC (データから計算されるハッシュ値) です。

デフォルト: RANDOM

skipHeaderLines

モニタリング対象ファイルの始めにエージェントが解析をスキップするの行数。

値の範囲: 0 以上

デフォルト: 0 (ゼロ)

truncatedRecordTerminator

レコードのサイズが Kinesis Data Streams レコードの許容サイズを超えたときに解析されたレコードを切り捨てるために、エージェントが使用する文字列。(1,000 KB)

デフォルト: '\n'(改行)

複数のファイルディレクトリを監視し、複数のストリームに書き込み

複数のフロー設定を指定することによって、エージェントが複数のファイルディレクトリを監視し、複数のストリームにデータを送信するように設定できます。以下の構成例では、エージェントは 2 つのファイルディレクトリ監視し、それぞれ Kinesis ストリームおよび Kinesis Data Firehose 配信ストリームにデータを送信します。Kinesis ストリームと Kinesis Data Firehose 配信ストリームが 同じリージョンにある必要がないように Kinesis Data Streams と Kinesis Data Firehose に異なるエンドポイントを指定できます。

{ "cloudwatch.emitMetrics": true, "kinesis.endpoint": "https://your/kinesis/endpoint", "firehose.endpoint": "https://your/firehose/endpoint", "flows": [ { "filePattern": "/tmp/app1.log*", "kinesisStream": "yourkinesisstream" }, { "filePattern": "/tmp/app2.log*", "deliveryStream": "yourfirehosedeliverystream" } ] }

Kinesis Data Firehose でエージェントを使用に関する詳細については、「Kinesis エージェントによる Amazon Kinesis Data Firehose への書き込み」を参照してください。

エージェントを使用してデータを事前処理する

エージェントはストリームにレコードを送信する前に、モニタリング対象ファイルから解析したレコードを事前処理できます。ファイルフローに dataProcessingOptions 設定を追加することで、この機能を有効にできます。1 つ以上の処理オプションを追加でき、また指定されている順序で実行されます。

エージェントは、リストされた次の処理オプションに対応しています。エージェントはオープンソースであるため、処理オプションを開発および拡張できます。Kinesis エージェント からエージェントをダウンロードできます。

処理オプション

SINGLELINE

改行文字、先頭のスペース、末尾のスペースを削除することで、複数行レコードを単一行レコードに変換します。

{ "optionName": "SINGLELINE" }
CSVTOJSON

区切り形式から JSON 形式にレコードを変換します。

{ "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", ... ], "delimiter": "yourdelimiter" }
customFieldNames

[必須] 各 JSON キー値のペアでキーとして使用されるフィールド名。たとえば、["f1", "f2"] を指定した場合は、レコード「v1、v2」は {"f1":"v1","f2":"v2"} に変換されます。

delimiter

レコードで区切り記号として使用する文字列。デフォルトはカンマ (,) です。

LOGTOJSON

ログ形式から JSON 形式にレコードを変換します。サポートされているログ形式は、Apache Common LogApache Combined LogApache Error Log、 および RFC3164 Syslog です。

{ "optionName": "LOGTOJSON", "logFormat": "logformat", "matchPattern": "yourregexpattern", "customFieldNames": [ "field1", "field2", ] }
logFormat

[必須] ログエントリ形式。以下の値を指定できます。

  • COMMONAPACHELOG — Apache Common Log 形式各ログエントリは、デフォルトで次のパターン「%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}」になります。

  • COMBINEDAPACHELOG — The Apache Combined Log 形式。各ログエントリは、デフォルトで次のパターン「%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}」になります。

  • APACHEERRORLOG — Apache Error Log 形式。各ログエントリは、デフォルトで次のパターン「[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}」になります。

  • SYSLOG — RFC3164 Syslog 形式。各ログエントリは、デフォルトで次のパターン「%{timestamp} %{hostname} %{program}[%{processid}]: %{message}」になります。

matchPattern

ログエントリから値を取得するために使用する正規表現パターン。この設定は、ログエントリが定義されたログ形式の一つとして存在していない場合に使用されます。この設定を使用する場合は、customFieldNames を指定する必要があります。

customFieldNames

JSON キー値のペアでキーとして使用されるカスタムフィールド名。matchPattern から抽出した値のフィールド名を定義するために、または事前定義されたログ形式のデフォルトのフィールド名を上書きするために、この設定を使用できます。

例 : LOGTOJSON 設定

JSON形式に変換された Apache Common Log エントリの LOGTOJSON 設定の一つの例を次に示します。

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" }

変換前:

64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291

変換後:

{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}

例 : カスタムフィールドがある LOGTOJSON 設定

こちらは LOGTOJSON 設定の別の例です。

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"] }

この設定では、前の例からの同じ Apache Common Log エントリは、次のように JSON 形式に変換されます。

{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}

例 : Apache Common Log エントリの変換

次のフロー設定は Apache Common Log エントリを JSON 形式の単一行レコードに変換します。

{ "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "my-stream", "dataProcessingOptions": [ { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" } ] } ] }

例 : 複数行レコードの変換

次のフロー設定は、最初の行が「[SEQUENCE=」で開始している複数行レコードを解析します。各レコードはまず単一行レコードに変換されます。次に、値はタブの区切り記号に基づいたレコードから取得されます。取得された値は指定された customFieldNames 値にマッピングされ、JSON 形式の単一行レコードを形成します。

{ "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "my-stream", "multiLineStartPattern": "\\[SEQUENCE=", "dataProcessingOptions": [ { "optionName": "SINGLELINE" }, { "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", "field3" ], "delimiter": "\\t" } ] } ] }

例 : 一致パターンで LOGTOJSON 設定

こちらは、最後のフィールド (バイト) が省略された JSON 形式に変換された Apache Common Log エントリの LOGTOJSON 設定の一例です。

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})", "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"] }

変換前:

123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200

変換後:

{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}

エージェント CLI コマンド

システムスタートアップ時のエージェントの自動的開始:

sudo chkconfig aws-kinesis-agent on

エージェントのステータスの確認:

sudo service aws-kinesis-agent status

エージェントの停止:

sudo service aws-kinesis-agent stop

この場所からエージェントのログファイルを読む:

/var/log/aws-kinesis-agent/aws-kinesis-agent.log

エージェントのアンインストール:

sudo yum remove aws-kinesis-agent