Kinesis エージェントを使用した Kinesis Data Firehose への書き込み - Amazon Kinesis Data Firehose

「翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。」

Kinesis エージェントを使用した Kinesis Data Firehose への書き込み

Amazon Kinesisエージェントはスタンドアロンの Java ソフトウェアアプリケーションで、データを収集して に送信する簡単な方法を提供しますKinesis Data Firehose。このエージェントは一連のファイルを継続的に監視し、新しいデータを Kinesis Data Firehose 配信ストリームに送信します。エージェントはファイルのローテーション、チェックポイント、失敗時の再試行を処理します。タイムリーで信頼性の高い簡単な方法で、すべてのデータを提供します。また、ストリーミング処理のモニタリングとトラブルシューティングに役立つ Amazon CloudWatch メトリクスも出力します。

デフォルトでは、レコードは改行文字 ('\n') に基づいて各ファイルから解析されます。しかし、複数行レコードを解析するよう、エージェントを設定することもできます (「エージェントの設定」を参照)。

このエージェントは、ウェブサーバー、ログサーバーおよびデータベースサーバーなど、Linux ベースのサーバー環境にインストールできます。エージェントをインストールした後で、モニタリング用のファイルとデータの配信ストリームを指定して設定します。エージェントが設定されると、ファイルから永続的にデータを収集して、配信ストリームに安全にデータを送信します。

Prerequisites

  • オペレーティングシステムは、Amazon Linux または Red Hat Enterprise Linux バージョン 7 以降である必要があります。

  • エージェントバージョン 2.0.0 以降は、JRE バージョン 1.8 以降を使用して実行されます。エージェントバージョン 1.1.x は JRE 1.7 以降を使用して実行されます。

  • Amazon EC2 を使用してエージェントを実行している場合は、EC2 インスタンスを起動します。

  • 指定した IAM ロールまたは AWS 認証情報には、エージェントが配信ストリームにデータを送信するために、Kinesis Data Firehose PutRecordBatch オペレーションを実行する権限が必要です。エージェントの CloudWatch モニタリングを有効にしている場合は、CloudWatch PutMetricData オペレーションを実行する権限も必要になります。詳細については、「Amazon Kinesis Data Firehose によるアクセスの制御 」、「Kinesis エージェントの状態のモニタリング」、および「Amazon CloudWatch の認証とアクセスコントロール」を参照してください。

Credentials

次のいずれかの方法を使用して、AWS 認証情報を管理します。

  • カスタム認証情報プロバイダーを作成します。詳細については、「カスタム認証情報プロバイダー」を参照してください。

  • EC2 インスタンスを起動する際に IAM ロールを指定します。

  • エージェントを設定する際に AWS 認証情報を指定します (「エージェントの設定」の設定の表で、awsAccessKeyIdawsSecretAccessKey のエントリを参照してください)。

  • /etc/sysconfig/aws-kinesis-agent を編集して、AWS リージョンと AWS アクセスキーを指定します。

  • EC2 インスタンスが他の AWS アカウントにある場合、Kinesis Data Firehose サービスへのアクセス権を提供する IAM ロールを作成します。エージェントを設定するときに、そのロールを指定します ( assumeRoleARN および を参照)assumeRoleExternalId。前のいずれかの方法を使用して、このロールを引き受ける権限がある、他のアカウントのユーザーの AWS 認証情報を指定します。

カスタム認証情報プロバイダー

カスタム認証情報プロバイダーを作成し、そのクラス名と jar パスを次の構成設定で Kinesis エージェントに渡すことができます。userDefinedCredentialsProvider.classnameおよび userDefinedCredentialsProvider.location 。 これら 2 つの構成設定の説明については、「」を参照してくださいエージェントの設定

カスタム認証情報プロバイダーを作成するには、次の例に示すように、AWSCredentialsProvider インターフェイスを実装するクラスを定義します。

import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; public class YourClassName implements AWSCredentialsProvider { public YourClassName() { } public AWSCredentials getCredentials() { return new BasicAWSCredentials("key1", "key2"); } public void refresh() { } }

このクラスは、引数を取らないコンストラクターを必要とします。

AWS は更新メソッドを定期的に呼び出して、更新された認証情報を取得します。認証情報プロバイダーからその存続期間中に常に異なる認証情報を提供する場合は、このメソッドに認証情報を更新するためのコードを含めます。または、静的な (変わらない) 認証情報を提供する認証情報プロバイダーを必要とする場合は、このメソッドを空のままにすることもできます。

エージェントのダウンロードとインストール

最初に、インスタンスに接続します。詳細については、https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/ec2-connect-to-instance-linux.htmlの「Linux インスタンス用 Amazon EC2 ユーザーガイドインスタンスへの接続」を参照してください。接続できない場合は、『https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/TroubleshootingInstancesConnecting.html』の「Linux インスタンス用 Amazon EC2 ユーザーガイドインスタンスへの接続に関するトラブルシューティング」を参照してください。

次に、次のいずれかの方法を使用して、エージェントをインストールします。

  • Amazon Linux リポジトリからエージェントを設定するには

    この方法は Amazon Linux インスタンスにのみ使用できます。次のコマンドを使用します。

    sudo yum install –y aws-kinesis-agent

    エージェント v 2.0.0 以降は、オペレーティングシステムが Amazon Linux 2 (AL2) のコンピュータにインストールされます。このエージェントバージョンには Java 1.8 以降が必要です。必要な Java バージョンがまだ存在しない場合は、エージェントのインストールプロセスでインストールされます。Amazon Linux 2 の詳細については、https://aws.amazon.com/amazon-linux-2/ を参照してください

  • Amazon S3 リポジトリからエージェントを設定するには

    この方法は、Red Hat Enterprise Linux と Amazon Linux 2 インスタンスで機能します。これらのインスタンスは、公開されているリポジトリからエージェントをインストールするためです。次のコマンドを使用して、最新バージョンのエージェントバージョン 2.x.x をダウンロードしてインストールします。

    sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm

    特定バージョンのエージェントをインストールするには、そのバージョン番号をコマンドで指定します。たとえば、次のコマンドは エージェント v 2.0.1 をインストールします。

    sudo yum install –y https://streaming-data-agent.s3.amazonaws.com/aws-kinesis-agent-2.0.1-1.amzn1.noarch.rpm

    Java 1.7 があり、アップグレードしたくない場合は、Java 1.7 と互換性のあるエージェントバージョン 1.x.x をダウンロードできます。たとえば、エージェント v1.1.6 をダウンロードするには、次のコマンドを使用できます。

    sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-1.1.6-1.amzn1.noarch.rpm

    最新のエージェント v1.x.x は、次のコマンドを使用してダウンロードできます。

    sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn1.noarch.rpm
  • リポジトリからエージェントを設定するにはGitHub

    1. まず、エージェントのバージョンに応じて、必要な Java バージョンがインストールされていることを確認します。

    2. エージェントを awslabs/amazon-kinesis-agent リポジトリからダウンロードします。GitHub

    3. ダウンロードしたディレクトリまで移動し、次のコマンドを実行してエージェントをインストールします。

      sudo ./setup --install

エージェントの設定と開始

エージェントを設定して開始するには

  1. (デフォルトのファイルアクセス許可を使用している場合、スーパーユーザーとして)設定ファイル (/etc/aws-kinesis/agent.json) を開き、編集します。

    この設定ファイルで、エージェントがデータを集めるファイル ("filePattern") とエージェントがデータを送信する配信ストリーム ("deliveryStream") の名前を指定します。ファイル名はパターンで、エージェントはファイルローテーションを確認します。1 秒あたりに一度だけ、ファイルを交替または新しいファイルを作成できます。エージェントはファイル作成タイムスタンプを使用して、追跡し、配信ストリームに送信するファイルを判断します。新しいファイルを作成したり、1 秒に 1 回を超える頻度でファイルをローテーションしたりしても、エージェント間で適切に区別することはできません。

    { "flows": [ { "filePattern": "/tmp/app.log*", "deliveryStream": "yourdeliverystream" } ] }

    デフォルトの AWS リージョンは ですus-east-1。 別のリージョンを使用している場合は、設定ファイルに設定を追加し、リージョンのエンドポイントを指定します。firehose.endpoint詳細については、エージェントの設定 を参照してください。

  2. エージェントを手動で開始する:

    sudo service aws-kinesis-agent start
  3. (オプション) システムスタートアップ時にエージェントを開始するように設定します。

    sudo chkconfig aws-kinesis-agent on

エージェントは、システムのサービスとしてバックグラウンドで実行されます。継続的に指定ファイルをモニタリングし、指定された配信ストリームにデータを送信します。エージェント活動は、/var/log/aws-kinesis-agent/aws-kinesis-agent.log に記録されます。

エージェントの設定

エージェントは、2 つの必須設定、filePatterndeliveryStream、さらに追加機能としてオプションの設定をサポートしています。必須およびオプションの設定を /etc/aws-kinesis/agent.json で指定できます。

設定ファイルを変更した場合は、次のコマンドを使用してエージェントを停止および起動する必要があります。

sudo service aws-kinesis-agent stop sudo service aws-kinesis-agent start

または、次のコマンドを使用できます。

sudo service aws-kinesis-agent restart

全般設定は次のとおりです。

構成設定  説明
assumeRoleARN

ユーザーが引き受けるロールの Amazon リソースネーム (ARN)。詳細については、次のガイドの「AWS アカウント間の IAM ロールを使用したアクセスの委任」を参照してくださいIAM ユーザーガイド

assumeRoleExternalId

ロールを引き受けることができるユーザーを決定するオプションの ID。詳細については、次のガイドの「外部 ID を使用する方法」を参照してくださいIAM ユーザーガイド

awsAccessKeyId

デフォルトの認証情報を上書きする AWS アクセスキー IDです。この設定は、他のすべての認証情報プロバイダーに優先されます。

awsSecretAccessKey

デフォルトの認証情報を上書きする AWS シークレットキーです。この設定は、他のすべての認証情報プロバイダーに優先されます。

cloudwatch.emitMetrics

エージェントがメトリクスを CloudWatch に発行できるようにします (true に設定された場合)。

デフォルト: true

cloudwatch.endpoint

CloudWatch のリージョンのエンドポイントです。

デフォルト:monitoring.us-east-1.amazonaws.com

firehose.endpoint

Kinesis Data Firehose のリージョンのエンドポイントです。

デフォルト:firehose.us-east-1.amazonaws.com

sts.endpoint

AWS Security Token Service のリージョンのエンドポイント。

デフォルト:https://sts.amazonaws.com

userDefinedCredentialsProvider.classname カスタム認証情報プロバイダーを定義する場合、この設定を使用してその完全修飾クラス名を指定します。クラス名の末尾に .class を含めないでください。
userDefinedCredentialsProvider.location カスタム認証情報プロバイダーを定義する場合、この設定を使用して、カスタム認証情報プロバイダーが含まれている jar の絶対パスを指定します。エージェントは、次の場所で jar ファイルを探します。/usr/share/aws-kinesis-agent/lib/

フロー設定は次のとおりです。

構成設定  説明
aggregatedRecordSizeBytes

1 回のオペレーションでエージェントがレコードを集計し配信ストリームに配置するには、この設定を指定します。エージェントが配信ストリームに配置する前の集約レコードのサイズに設定します。

デフォルト: 0 (集約なし)

dataProcessingOptions

配信ストリームに送信される前に解析された各レコードに適用される処理オプションの一覧。処理オプションは指定した順序で実行されます。詳細については、エージェントを使用してデータを事前処理する を参照してください。

deliveryStream

[必須] 配信ストリームの名前。

filePattern

[必須] エージェントによってモニタリングされる必要があるファイルの glob このパターンに一致するすべてのファイルは、エージェントによって自動的に選択され、モニタリングされます。このパターンに一致するすべてのファイルで、読み取りアクセス許可を に付与aws-kinesis-agent-userします。 ファイルを含むディレクトリで、読み取りアクセス許可と実行アクセス許可を に付与aws-kinesis-agent-userします。

重要

エージェントは、このパターンに一致するファイルをすべて取得します。エージェントが意図しないレコードを取得しないように、このパターンは慎重に選択してください。

initialPosition

ファイルの解析が開始される最初の位置。有効な値は、START_OF_FILE および END_OF_FILE です。

デフォルト:END_OF_FILE

maxBufferAgeMillis

エージェントが配信ストリームに送信する前にデータをバッファする最大時間 (ミリ秒)。

値の範囲: 1,000~900,000 (1 秒~15 分)

デフォルト: 60,000(1 分)

maxBufferSizeBytes

エージェントが配信ストリームに送信する前にデータをバッファする最大サイズ (バイト)。

値の範囲: –14,194,304(4 MB)

デフォルト値: 4,194,304(4 MB)

maxBufferSizeRecords

エージェントが配信ストリームに送信する前にデータをバッファするレコードの最大数。

値の範囲: 1–~500

デフォルト値: 500

minTimeBetweenFilePollsMillis

エージェントが新しいデータのモニタリング対象ファイルをポーリングし、解析する時間間隔 (ミリ秒単位)。

値の範囲: 1 以上

デフォルト値: 100

multiLineStartPattern

レコードの開始を識別するパターン。レコードは、パターンに一致する 1 行と、それに続くパターンに一致しない行で構成されます。有効な値は正規表現です。デフォルトでは、ログファイルのそれぞれの改行は 1 つのレコードとして解析されます。

skipHeaderLines

モニタリング対象ファイルの始めにエージェントが解析をスキップするの行数。

値の範囲: 0 以上

デフォルト: 0 (ゼロ)

truncatedRecordTerminator

レコードのサイズが Kinesis Data Firehose レコードの許容サイズを超えたときに解析されたレコードを切り捨てるために、エージェントが使用する文字列。(1,000 KB)

デフォルト値: '\n'(改行)

複数のファイルディレクトリを監視し、複数のストリームに書き込み

複数のフロー設定を指定することによって、エージェントが複数のファイルディレクトリを監視し、複数のストリームにデータを送信するように設定できます。以下の構成例では、エージェントは 2 つのファイルディレクトリ監視し、それぞれ Kinesis データストリームおよび Kinesis Data Firehose 配信ストリームにデータを送信します。Kinesis Data Streams と Kinesis Data Firehose に異なるエンドポイントを指定できるため、データストリームと Kinesis Data Firehose 配信ストリームが同じリージョンに存在する必要はありません。

{ "cloudwatch.emitMetrics": true, "kinesis.endpoint": "https://your/kinesis/endpoint", "firehose.endpoint": "https://your/firehose/endpoint", "flows": [ { "filePattern": "/tmp/app1.log*", "kinesisStream": "yourkinesisstream" }, { "filePattern": "/tmp/app2.log*", "deliveryStream": "yourfirehosedeliverystream" } ] }

Amazon Kinesis Data Streams でのエージェントの使用の詳細については、「Kinesis エージェントによる Amazon Kinesis Data Streams への書き込み」を参照してください。

エージェントを使用してデータを事前処理する

エージェントは配信ストリームにレコードを送信する前に、モニタリング対象ファイルから解析したレコードを事前処理できます。ファイルフローに dataProcessingOptions 設定を追加することで、この機能を有効にできます。処理オプションを 1 つ以上追加することができます。また、指定の順序で実行されます。

エージェントは、次の処理オプションに対応しています。エージェントはオープンソースであるため、処理オプションを開発および拡張できます。Kinesis エージェントからエージェントをダウンロードできます。

処理オプション

SINGLELINE

改行文字、先頭のスペース、末尾のスペースを削除することで、複数行レコードを単一行レコードに変換します。

{ "optionName": "SINGLELINE" }
CSVTOJSON

区切り形式から JSON 形式にレコードを変換します。

{ "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", ... ], "delimiter": "yourdelimiter" }
customFieldNames

[必須] 各 JSON キー値のペアでキーとして使用されるフィールド名。たとえば、["f1", "f2"] を指定した場合は、レコード「v1、v2」は {"f1":"v1","f2":"v2"} に変換されます。

delimiter

レコードで区切り記号として使用する文字列。デフォルトはカンマ (,) です。

LOGTOJSON

ログ形式から JSON 形式にレコードを変換します。サポートされているログ形式は 、 Apache Common Log 、 、および Apache Combined Log Apache Error Logです。RFC3164 Syslog

{ "optionName": "LOGTOJSON", "logFormat": "logformat", "matchPattern": "yourregexpattern", "customFieldNames": [ "field1", "field2", ] }
logFormat

[必須] ログエントリ形式。以下の値のいずれかになります。

  • COMMONAPACHELOG — Apache Common Log 形式。各ログエントリは、デフォルトで次のパターンになります。"%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}

  • COMBINEDAPACHELOG — Apache Combined Log 形式。各ログエントリは、デフォルトで次のパターンになります。"%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}

  • APACHEERRORLOG — Apache Error Log 形式。各ログエントリは、デフォルトで次のパターンになります。"[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}

  • SYSLOG — RFC3164 Syslog 形式。各ログエントリは、デフォルトで次のパターンになります。"%{timestamp} %{hostname} %{program}[%{processid}]: %{message}

matchPattern

指定されたログ形式のデフォルトパターンを上書きします。カスタム形式を使用する場合は、この設定を使用してログエンティティから値を抽出します。matchPattern を指定する場合は、customFieldNames も指定する必要があります。

customFieldNames

JSON キー値のペアでキーとして使用されるカスタムフィールド名。matchPattern から抽出した値のフィールド名を定義するために、または事前定義されたログ形式のデフォルトのフィールド名を上書きするために、この設定を使用できます。

例 : LOGTOJSON 設定

JSON形式に変換された Apache Common Log エントリの LOGTOJSON 設定の一つの例を次に示します。

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" }

変換前:

64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291

変換後:

{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}

例 : カスタムフィールドがある LOGTOJSON 設定

こちらは LOGTOJSON 設定の別の例です。

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"] }

この設定では、前の例からの同じ Apache Common Log エントリは、次のように JSON 形式に変換されます。

{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}

例 : Apache Common Log エントリの変換

次のフロー設定は Apache Common Log エントリを JSON 形式の単一行レコードに変換します。

{ "flows": [ { "filePattern": "/tmp/app.log*", "deliveryStream": "my-delivery-stream", "dataProcessingOptions": [ { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" } ] } ] }

例 : 複数行レコードの変換

次のフロー設定は、最初の行が「[SEQUENCE=」で開始している複数行レコードを解析します。各レコードはまず単一行レコードに変換されます。次に、値はタブの区切り記号に基づいたレコードから取得されます。取得された値は指定された customFieldNames 値にマッピングされ、JSON 形式の単一行レコードを形成します。

{ "flows": [ { "filePattern": "/tmp/app.log*", "deliveryStream": "my-delivery-stream", "multiLineStartPattern": "\\[SEQUENCE=", "dataProcessingOptions": [ { "optionName": "SINGLELINE" }, { "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", "field3" ], "delimiter": "\\t" } ] } ] }

例 : 一致パターンで LOGTOJSON 設定

こちらは、最後のフィールド (バイト) が省略された JSON 形式に変換された Apache Common Log エントリの LOGTOJSON 設定の一例です。

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})", "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"] }

変換前:

123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200

変換後:

{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}

エージェント CLI コマンド

システムスタートアップ時のエージェントの自動的開始:

sudo chkconfig aws-kinesis-agent on

エージェントのステータスの確認:

sudo service aws-kinesis-agent status

エージェントの停止:

sudo service aws-kinesis-agent stop

この場所からエージェントのログファイルを読む:

/var/log/aws-kinesis-agent/aws-kinesis-agent.log

エージェントのアンインストール:

sudo yum remove aws-kinesis-agent