Kinesis エージェントを使用して Amazon Kinesis Data Streams に書き込む - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Kinesis エージェントを使用して Amazon Kinesis Data Streams に書き込む

Kinesis エージェントはスタンドアロンの Java ソフトウェアアプリケーションであり、データを収集して Kinesis Data Streams に送信する簡単な方法です。このエージェントは一連のファイルを継続的に監視し、新しいデータをストリームに送信します。エージェントはファイルのローテーション、チェックポイント、失敗時の再試行を処理します。タイムリーで信頼性の高い簡単な方法で、すべてのデータを提供します。また、ストリーミング処理のモニタリングとトラブルシューティングに役立つ Amazon CloudWatch メトリクスも出力します。

デフォルトでは、レコードは改行文字 ('\n') に基づいて各ファイルから解析されます。しかし、複数行レコードを解析するよう、エージェントを設定することもできます (エージェント設定を指定するを参照)。

このエージェントは、ウェブサーバー、ログサーバーおよびデータベースサーバーなど、Linux ベースのサーバー環境にインストールできます。エージェントをインストールした後で、モニタリング用のファイルとデータストリームを指定して設定します。エージェントが設定されると、ファイルから永続的にデータを収集して、ストリームに安全にデータを送信します。

Kinesis Agent の前提条件を完了する

  • オペレーティングシステムは Amazon Linux AMI バージョン 2015.09 以降、または Red Hat Enterprise Linux バージョン 7 以降でなければなりません。

  • Amazon EC2 を使用してエージェントを実行している場合は、EC2 インスタンスを起動します。

  • 次のいずれかの方法を使用して、AWS 認証情報を管理します。

    • EC2 インスタンスを起動する際に IAM ロールを指定します。

    • エージェントを設定する際に AWS 認証情報を指定します (awsAccessKeyId および awsSecretAccessKey を参照してください)。

    • /etc/sysconfig/aws-kinesis-agent を編集して、 リージョンと AWS クセスキーを指定します。

    • EC2 インスタンスが他の AWS アカウントにある場合、Kinesis Data Streams サービスへのアクセス権を付与する IAM ロールを作成し、エージェントを設定するときにそのロールを指定します (assumeRoleARNassumeRoleExternalId を参照)。前のいずれかの方法を使用して、このロールを引き受ける許可がある、他のアカウントのユーザーの AWS 認証情報を指定します。

  • 指定した IAM ロールまたは AWS 認証情報には、Kinesis Data Streams PutRecords オペレーションを実行してエージェントからストリームにデータを送信するための許可が必要です。エージェントの CloudWatch モニタリングを有効にしている場合は、CloudWatch PutMetricData オペレーションを実行する許可も必要になります。詳細については、を使用した Amazon Kinesis Data Streams リソースへのアクセスの制御 IAMAmazon で Kinesis Data Streams エージェントの状態をモニタリングする CloudWatch、およびCloudWatch アクセスコントロールを参照してください。

エージェントをダウンロードしてインストールする

最初に、インスタンスに接続します。詳細については、「Amazon EC2 ユーザーガイド」の「Linux インスタンスへの接続」を参照してください。接続できない場合は、「Amazon EC2 ユーザーガイド」の「インスタンスへの接続に関するトラブルシューティング」を参照してください。

Amazon Linux AMI を使用してエージェントを設定する

次のコマンドを使用して、エージェントをダウンロードしてインストールします。

sudo yum install –y aws-kinesis-agent
Red Hat Enterprise Linux を使用してエージェントを設定する

次のコマンドを使用して、エージェントをダウンロードしてインストールします。

sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
GitHubを使用してエージェントを設定する
  1. エージェントを awlabs/amazon-kinesis-agent からダウンロードします。

  2. ダウンロードしたディレクトリまで移動し、次のコマンドを実行してエージェントをインストールします。

    sudo ./setup --install
Docker コンテナにエージェントをセットアップするには

Kinesis Agent は、amazonlinux コンテナベースを使ってコンテナで実行することもできます。次の Docker ファイルを使用し、docker build を実行します。

FROM amazonlinux RUN yum install -y aws-kinesis-agent which findutils COPY agent.json /etc/aws-kinesis/agent.json CMD ["start-aws-kinesis-agent"]

エージェントを設定して開始する

エージェントを設定して開始するには
  1. (デフォルトのファイルアクセス許可を使用している場合、スーパーユーザーとして) 設定ファイル (/etc/aws-kinesis/agent.json) を開き、編集します。

    この設定ファイルで、エージェントがデータを集めるファイル ("filePattern") とエージェントがデータを送信するストリーム ("kinesisStream") を指定します。ファイル名はパターンで、エージェントはファイルローテーションを確認する点に注意してください。1 秒あたりに一度だけ、ファイルを交替または新しいファイルを作成できます。エージェントはファイル作成タイムスタンプを使用して、どのファイルを追跡してストリームに送信するかを判断します。新規ファイルの作成やファイルの交換を 1 秒あたりに一度以上頻繁に交換すると、エージェントはそれらを適切に区別できません。

    { "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "yourkinesisstream" } ] }
  2. エージェントを手動で開始する:

    sudo service aws-kinesis-agent start
  3. (オプション) システムスタートアップ時にエージェントを開始するように設定します。

    sudo chkconfig aws-kinesis-agent on

エージェントは、システムのサービスとしてバックグラウンドで実行されます。継続的に指定ファイルをモニタリングし、指定されたストリームにデータを送信します。エージェント活動は、/var/log/aws-kinesis-agent/aws-kinesis-agent.log に記録されます。

エージェント設定を指定する

エージェントは、2つの必須設定、filePatternkinesisStream、さらに追加機能として任意の設定をサポートしています。必須およびオプションの設定を /etc/aws-kinesis/agent.json で指定できます。

設定ファイルを変更した場合は、次のコマンドを使用してエージェントを停止および起動する必要があります。

sudo service aws-kinesis-agent stop sudo service aws-kinesis-agent start

または、次のコマンドを使用できます。

sudo service aws-kinesis-agent restart

全般設定は次のとおりです。

構成設定 説明
assumeRoleARN

ユーザーが引き受けるロールの ARN。詳細については、IAM ユーザーガイドAWS アカウント間の IAM ロールを使用したアクセスの委任を参照してください。

assumeRoleExternalId

ロールを引き受けることができるユーザーを決定するオプションの ID。詳細については、IAM ユーザーガイド外部 ID の使用方法を参照してください。

awsAccessKeyId

デフォルトの認証情報を上書きする AWS アクセスキー IDです。この設定は、他のすべての認証情報プロバイダーに優先されます。

awsSecretAccessKey

デフォルトの認証情報を上書きする AWS シークレットキーです。この設定は、他のすべての認証情報プロバイダーに優先されます。

cloudwatch.emitMetrics

エージェントがメトリクスを CloudWatch に出力できるようにします (true に設定された場合)。

デフォルト: true

cloudwatch.endpoint

CloudWatch のリージョンのエンドポイントです。

デフォルト: monitoring.us-east-1.amazonaws.com

kinesis.endpoint

Kinesis Data Streams のリージョンのエンドポイントです。

デフォルト: kinesis.us-east-1.amazonaws.com

フロー設定は次のとおりです。

構成設定 説明
dataProcessingOptions

ストリームに送信される前に解析された各レコードに適用されるの処理オプションの一覧。処理オプションは指定した順序で実行されます。詳細については、エージェントを使用してデータを事前処理するを参照してください。

kinesisStream

[必須] ストリームの名前。

filePattern

[必須] エージェントによって取得されるために一致する必要があるディレクトリとファイルパターン。このパターンに一致するすべてのファイルは、読み取り権限を aws-kinesis-agent-user に付与する必要があります。ファイルを含むディレクトリには、読み取りと実行権限を aws-kinesis-agent-user に付与する必要があります。

initialPosition

ファイルの解析が開始される最初の位置。有効な値は、START_OF_FILE および END_OF_FILE です。

デフォルト: END_OF_FILE

maxBufferAgeMillis

エージェントがストリームに送信する前にデータをバッファーする最大時間 (ミリ秒)。

値の範囲: 1,000~900,000 (1 秒~15 分)

デフォルト: 60,000 (1 分)

maxBufferSizeBytes

エージェントがストリームに送信する前にデータをバッファーする最大サイズ (バイト)。

値の範囲: 1~4,194,304 (4 MB)

デフォルト: 4,194,304 (4 MB)

maxBufferSizeRecords

エージェントがストリームに送信する前にデータをバッファーするレコードの最大数。

値の範囲: 1 ~ 500

デフォルト: 500

minTimeBetweenFilePollsMillis

エージェントが新しいデータのモニタリング対象ファイルをポーリングし、解析する時間間隔 (ミリ秒単位)。

値の範囲: 1 以上

デフォルト: 100

multiLineStartPattern

レコードの開始を識別するパターン。レコードは、パターンに一致する 1 行と、それに続くパターンに一致しない行で構成されます。有効な値は正規表現です。デフォルトでは、ログファイルのそれぞれの改行は 1 つのレコードとして解析されます。

partitionKeyOption

パーティションのキーを生成する方法。有効な値は RANDOM (ランダムに生成される整数) と DETERMINISTIC (データから計算されるハッシュ値) です。

デフォルト: RANDOM

skipHeaderLines

モニタリング対象ファイルの始めにエージェントが解析をスキップするの行数。

値の範囲: 0 以上

デフォルト: 0 (ゼロ)

truncatedRecordTerminator

レコードのサイズが Kinesis Data Streams レコードの許容サイズを超えたときに解析されたレコードを切り捨てるために、エージェントが使用する文字列。(1,000 KB)

デフォルト: '\n' (改行)

複数のファイルディレクトリを監視し、複数のストリームに書き込む

複数のフロー設定を指定することによって、エージェントが複数のファイルディレクトリを監視し、複数のストリームにデータを送信するように設定できます。以下の設定例では、エージェントは 2 つのファイルディレクトリをモニタリングし、それぞれ Kinesis ストリームおよび Firehose 配信ストリームにデータを送信します。Kinesis Data Streams と Firehose に異なるエンドポイントを指定できるため、Kinesis ストリームと Firehose 配信ストリームが同じリージョンに存在する必要はありません。

{ "cloudwatch.emitMetrics": true, "kinesis.endpoint": "https://your/kinesis/endpoint", "firehose.endpoint": "https://your/firehose/endpoint", "flows": [ { "filePattern": "/tmp/app1.log*", "kinesisStream": "yourkinesisstream" }, { "filePattern": "/tmp/app2.log*", "deliveryStream": "yourfirehosedeliverystream" } ] }

Firehose でのエージェントの使用の詳細については、「Kinesis エージェントを使用した Amazon Data Firehose への書き込み」を参照してください。

エージェントを使用してデータを事前処理する

エージェントはストリームにレコードを送信する前に、モニタリング対象ファイルから解析したレコードを事前処理できます。ファイルフローに dataProcessingOptions 設定を追加することで、この機能を有効にできます。1 つ以上の処理オプションを追加でき、また指定されている順序で実行されます。

エージェントは、リストされた次の処理オプションに対応しています。エージェントはオープンソースであるため、処理オプションを開発および拡張できます。Kinesis エージェントからエージェントをダウンロードできます。

処理オプション
SINGLELINE

改行文字、先頭のスペース、末尾のスペースを削除することで、複数行レコードを単一行レコードに変換します。

{ "optionName": "SINGLELINE" }
CSVTOJSON

区切り形式から JSON 形式にレコードを変換します。

{ "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", ... ], "delimiter": "yourdelimiter" }
customFieldNames

[必須] 各 JSON キー値のペアでキーとして使用されるフィールド名。たとえば、["f1", "f2"] を指定した場合は、レコードv1、v2は {"f1":"v1","f2":"v2"} に変換されます。

delimiter

レコードで区切り記号として使用する文字列。デフォルトはカンマ (,) です。

LOGTOJSON

ログ形式から JSON 形式にレコードを変換します。サポートされているログ形式は、Apache Common LogApache Combined LogApache Error Log、および RFC3164 Syslog です。

{ "optionName": "LOGTOJSON", "logFormat": "logformat", "matchPattern": "yourregexpattern", "customFieldNames": [ "field1", "field2", ] }
logFormat

[必須] ログエントリ形式。以下の値を指定できます。

  • COMMONAPACHELOG — Apache Common Log 形式。各ログエントリは、デフォルトで次のパターン%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}になります。

  • COMBINEDAPACHELOG — Apache Combined Log 形式。各ログエントリは、デフォルトで次のパターン%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}になります。

  • APACHEERRORLOG — Apache Error Log 形式。各ログエントリは、デフォルトで次のパターン[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}になります。

  • SYSLOG — FC3164 Syslog 形式。各ログエントリは、デフォルトで次のパターン%{timestamp} %{hostname} %{program}[%{processid}]: %{message}になります。

matchPattern

ログエントリから値を取得するために使用する正規表現パターン。この設定は、ログエントリが定義されたログ形式の一つとして存在していない場合に使用されます。この設定を使用する場合は、customFieldNames を指定する必要があります。

customFieldNames

JSON キー値のペアでキーとして使用されるカスタムフィールド名。matchPattern から抽出した値のフィールド名を定義するために、または事前定義されたログ形式のデフォルトのフィールド名を上書きするために、この設定を使用できます。

例 : LOGTOJSON 設定

JSON形式に変換された Apache Common Log エントリの LOGTOJSON 設定の一つの例を次に示します。

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" }

変換前:

64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291

変換後:

{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
例 : カスタムフィールドがある LOGTOJSON 設定

こちらは LOGTOJSON 設定の別の例です。

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"] }

この設定では、前の例からの同じ Apache Common Log エントリは、次のように JSON 形式に変換されます。

{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
例 : Apache Common Log エントリの変換

次のフロー設定は Apache Common Log エントリを JSON 形式の単一行レコードに変換します。

{ "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "my-stream", "dataProcessingOptions": [ { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" } ] } ] }
例 : 複数行レコードの変換

次のフロー設定は、最初の行が[SEQUENCE=で開始している複数行レコードを解析します。各レコードはまず単一行レコードに変換されます。次に、値はタブの区切り記号に基づいたレコードから取得されます。取得された値は指定された customFieldNames 値にマッピングされ、JSON 形式の単一行レコードを形成します。

{ "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "my-stream", "multiLineStartPattern": "\\[SEQUENCE=", "dataProcessingOptions": [ { "optionName": "SINGLELINE" }, { "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", "field3" ], "delimiter": "\\t" } ] } ] }
例 : 一致パターンで LOGTOJSON 設定

こちらは、最後のフィールド (バイト) が省略された JSON 形式に変換された Apache Common Log エントリの LOGTOJSON 設定の一例です。

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})", "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"] }

変換前:

123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200

変換後:

{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}

エージェント CLI コマンドを使用する

システムスタートアップ時のエージェントの自動的開始:

sudo chkconfig aws-kinesis-agent on

エージェントのステータスの確認:

sudo service aws-kinesis-agent status

エージェントの停止:

sudo service aws-kinesis-agent stop

この場所からエージェントのログファイルを読む:

/var/log/aws-kinesis-agent/aws-kinesis-agent.log

エージェントのアンインストール:

sudo yum remove aws-kinesis-agent

よくある質問

Windows 用の Kinesis Agent はありますか?

Windows 用 Kinesis Agent は Linux プラットフォーム用 Kinesis Agent とは異なるソフトウェアです。

Kinesis Agent の速度が低下したり、RecordSendErrors が増加したりするのはなぜですか?

通常、これは Kinesis のスロットリングが原因です。Kinesis Data Streams の WriteProvisionedThroughputExceeded メトリクス、または Firehose 配信ストリームの ThrottledRecords メトリクスをチェックします。これらのメトリクスが 0 を超えている場合は、ストリームの上限を引き上げる必要があることを意味します。詳細については、「Kinesis Data Stream limits」と「Amazon Firehose Delivery Streams」を参照してください。

スロットリングが原因ではないことがわかったら、Kinesis Agent が大量の小規模ファイルをテーリングするように設定されているかどうかを確認してください。Kinesis Agent が新しいファイルをテーリングするときには遅延が発生するため、少量の大きなファイルをテーリングするようにします。ログファイルを大きなファイルに統合してみてください。

java.lang.OutOfMemoryError の例外が発生するのはなぜですか?

Kinesis Agent に、現在のワークロードを処理するための十分なメモリがないためです。/usr/bin/start-aws-kinesis-agentJAVA_START_HEAPJAVA_MAX_HEAP を増やしてエージェントを再起動してみてください。

IllegalStateException : connection pool shut down の例外が発生するのはなぜですか?

Kinesis エージェントに、現在のワークロードを処理するための十分な接続がないためです。/etc/aws-kinesis/agent.json の一般的なエージェント設定で maxConnectionsmaxSendingThreads を増やしてみてください。これらのフィールドのデフォルト値は、使用可能なランタイムプロセッサの 12 倍です。高度なエージェント設定については、「AgentConfiguration.java」を参照してください。

Kinesis Agent に関する別の問題をデバッグする方法を教えてください。

DEBUG レベルログは /etc/aws-kinesis/log4j.xml で有効にできます。

Kinesis Agent はどのように設定するとよいですか?

maxBufferSizeBytes の値が小さいほど、Kinesis Agent がデータを送信する頻度が高くなります。そのため、レコードの配信時間が短縮されますが、Kinesis への 1 秒あたりのリクエスト数も増えます。

Kinesis Agent が重複レコードを送信するのはなぜですか?

これはファイルテーリングの設定ミスが原因です。各 fileFlow’s filePattern が それぞれ 1 つのファイルのみと一致するようにします。また、使用されている logrotate モードが copytruncate モードになっている場合にも発生することがあります。重複を避けるため、モードをデフォルトか作成モードに変更してみてください。重複レコードの処理に関する詳細は、「Handling Duplicate Records」を参照してください。