Kinesis エージェントを使用した Amazon Kinesis Data Streams への書き込み - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Kinesis エージェントを使用した Amazon Kinesis Data Streams への書き込み

Kinesis エージェントはスタンドアロンの Java ソフトウェアアプリケーションであり、データを収集して Kinesis Data Streams に送信する簡単な方法です。このエージェントは一連のファイルを継続的に監視し、新しいデータをストリームに送信します。エージェントはファイルのローテーション、チェックポイント、失敗時の再試行を処理します。タイムリーで信頼性の高い簡単な方法で、すべてのデータを提供します。また、ストリーミングプロセスのモニタリングとトラブルシューティングに役立つ Amazon CloudWatch メトリクスも出力します。

デフォルトでは、レコードは改行文字 ('\n') に基づいて各ファイルから解析されます。しかし、複数行レコードを解析するよう、エージェントを設定することもできます (エージェントの設定を参照)。

このエージェントは、ウェブサーバー、ログサーバーおよびデータベースサーバーなど、Linux ベースのサーバー環境にインストールできます。エージェントをインストールした後で、モニタリング用のファイルとデータストリームを指定して設定します。エージェントが設定されると、ファイルから永続的にデータを収集して、ストリームに安全にデータを送信します。

前提条件

  • オペレーティングシステムは Amazon Linux AMI バージョン 2015.09 以降、または Red Hat Enterprise Linux バージョン 7 以降でなければなりません。

  • Amazon EC2 を使用してエージェントを実行している場合は、EC2 インスタンスを起動します。

  • 次のいずれかの方法を使用して AWS 認証情報を管理します。

    • EC2 インスタンスを起動する際に IAM ロールを指定します。

    • エージェントを設定するときに AWS 認証情報を指定します (awsAccessKey「IDawsSecretAccessキー」を参照)。

    • を編集/etc/sysconfig/aws-kinesis-agentして、リージョンと AWS アクセスキーを指定します。

    • EC2 インスタンスが別の AWS アカウントにある場合は、Kinesis Data Streams サービスへのアクセスを提供する IAM ロールを作成し、エージェントを設定するときにそのロールを指定します (assumeRoleARN and assumeRoleExternalId」を参照)。前述の方法のいずれかを使用して、このロールを引き受けるアクセス許可を持つ他のアカウントのユーザーの AWS 認証情報を指定します。

  • 指定する IAM ロールまたは AWS 認証情報には、エージェントがストリームにデータを送信するために Kinesis Data Streams PutRecordsオペレーションを実行するアクセス許可が必要です。エージェント CloudWatch のモニタリングを有効にする場合は、 オペレーションを実行する CloudWatch PutMetricDataアクセス許可も必要です。詳細については、IAM を使用した Amazon Kinesis Data Streams リソースへのアクセスの制御「」、Amazon による Kinesis Data Streams エージェントの状態のモニタリング CloudWatch「」、およびCloudWatch 「アクセスコントロール」を参照してください。

エージェントのダウンロードとインストール

最初に、インスタンスに接続します。詳細については、「Amazon EC2 ユーザーガイド」の「インスタンスに接続する」を参照してください。 Amazon EC2 接続に問題がある場合は、Amazon EC2 ユーザーガイド」の「インスタンスへの接続のトラブルシューティング」を参照してください。

Amazon Linux AMI を使用してエージェントを設定する

次のコマンドを使用して、エージェントをダウンロードしてインストールします。

sudo yum install –y aws-kinesis-agent
Red Hat Enterprise Linux を使用してエージェントを設定する

次のコマンドを使用して、エージェントをダウンロードしてインストールします。

sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
を使用して エージェントを設定するには GitHub
  1. awlabs/amazon-kinesis-agent からエージェントをダウンロードします。

  2. ダウンロードしたディレクトリまで移動し、次のコマンドを実行してエージェントをインストールします。

    sudo ./setup --install
Docker コンテナにエージェントをセットアップするには

Kinesis Agent は、amazonlinux コンテナベースを使ってコンテナで実行することもできます。次の Docker ファイルを使用し、docker build を実行します。

FROM amazonlinux RUN yum install -y aws-kinesis-agent which findutils COPY agent.json /etc/aws-kinesis/agent.json CMD ["start-aws-kinesis-agent"]

エージェントの設定と開始

エージェントを設定して開始するには
  1. (デフォルトのファイルアクセス許可を使用している場合、スーパーユーザーとして) 設定ファイル (/etc/aws-kinesis/agent.json) を開き、編集します。

    この設定ファイルで、エージェントがデータを集めるファイル ("filePattern") とエージェントがデータを送信するストリーム ("kinesisStream") を指定します。ファイル名はパターンで、エージェントはファイルローテーションを確認する点に注意してください。1 秒あたりに一度だけ、ファイルを交替または新しいファイルを作成できます。エージェントはファイル作成タイムスタンプを使用して、どのファイルを追跡してストリームに送信するかを判断します。新規ファイルの作成やファイルの交換を 1 秒あたりに一度以上頻繁に交換すると、エージェントはそれらを適切に区別できません。

    { "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "yourkinesisstream" } ] }
  2. エージェントを手動で開始する:

    sudo service aws-kinesis-agent start
  3. (オプション) システムスタートアップ時にエージェントを開始するように設定します。

    sudo chkconfig aws-kinesis-agent on

エージェントは、システムのサービスとしてバックグラウンドで実行されます。継続的に指定ファイルをモニタリングし、指定されたストリームにデータを送信します。エージェント活動は、/var/log/aws-kinesis-agent/aws-kinesis-agent.log に記録されます。

エージェントの設定

エージェントは、2つの必須設定、filePatternkinesisStream、さらに追加機能として任意の設定をサポートしています。必須およびオプションの設定を /etc/aws-kinesis/agent.json で指定できます。

設定ファイルを変更した場合は、次のコマンドを使用してエージェントを停止および起動する必要があります。

sudo service aws-kinesis-agent stop sudo service aws-kinesis-agent start

または、次のコマンドを使用できます。

sudo service aws-kinesis-agent restart

全般設定は次のとおりです。

構成設定 説明
assumeRoleARN

ユーザーが引き受けるロールの ARN。詳細については、「IAM ユーザーガイド」の「IAM ロールを使用した AWS アカウント間のアクセスの委任」を参照してください。

assumeRoleExternalId

ロールを引き受けることができるユーザーを決定するオプションの ID。詳細については、IAM ユーザーガイド外部 ID の使用方法を参照してください。

awsAccessKeyId

AWS デフォルトの認証情報を上書きする アクセスキー ID。この設定は、他のすべての認証情報プロバイダーに優先されます。

awsSecretAccessKey

AWS デフォルトの認証情報を上書きする シークレットキー。この設定は、他のすべての認証情報プロバイダーに優先されます。

cloudwatch.emitMetrics

設定されている場合、エージェントがメトリクスを に出力 CloudWatch できるようにします (true)。

デフォルト: true

cloudwatch.endpoint

のリージョンエンドポイント CloudWatch。

デフォルト: monitoring.us-east-1.amazonaws.com

kinesis.endpoint

Kinesis Data Streams のリージョンのエンドポイントです。

デフォルト: kinesis.us-east-1.amazonaws.com

フロー設定は次のとおりです。

構成設定 説明
dataProcessingOptions

ストリームに送信される前に解析された各レコードに適用されるの処理オプションの一覧。処理オプションは指定した順序で実行されます。詳細については、エージェントを使用してデータを事前処理するを参照してください。

kinesisStream

[必須] ストリームの名前。

filePattern

〔必須] エージェントによって取得されるために一致する必要があるディレクトリとファイルパターン。このパターンに一致するすべてのファイルは、読み取り権限を aws-kinesis-agent-user に付与する必要があります。ファイルを含むディレクトリには、読み取りと実行権限を aws-kinesis-agent-user に付与する必要があります。

initialPosition

ファイルの解析が開始される最初の位置。有効な値は、START_OF_FILE および END_OF_FILE です。

デフォルト: END_OF_FILE

maxBufferAgeMillis

エージェントがストリームに送信する前にデータをバッファーする最大時間 (ミリ秒)。

値の範囲: 1,000~900,000 (1 秒~15 分)

デフォルト: 60,000 (1 分)

maxBufferSizeBytes

エージェントがストリームに送信する前にデータをバッファーする最大サイズ (バイト)。

値の範囲: 1~4,194,304 (4 MB)

デフォルト: 4,194,304 (4 MB)

maxBufferSizeRecords

エージェントがストリームに送信する前にデータをバッファーするレコードの最大数。

値の範囲: 1 ~ 500

デフォルト: 500

minTimeBetweenFilePollsMillis

エージェントが新しいデータのモニタリング対象ファイルをポーリングし、解析する時間間隔 (ミリ秒単位)。

値の範囲: 1 以上

デフォルト: 100

multiLineStartPattern

レコードの開始を識別するパターン。レコードは、パターンに一致する 1 行と、それに続くパターンに一致しない行で構成されます。有効な値は正規表現です。デフォルトでは、ログファイルのそれぞれの改行は 1 つのレコードとして解析されます。

partitionKeyOption

パーティションのキーを生成する方法。有効な値は RANDOM (ランダムに生成される整数) と DETERMINISTIC (データから計算されるハッシュ値) です。

デフォルト: RANDOM

skipHeaderLines

モニタリング対象ファイルの始めにエージェントが解析をスキップするの行数。

値の範囲: 0 以上

デフォルト: 0 (ゼロ)

truncatedRecordTerminator

レコードのサイズが Kinesis Data Streams レコードの許容サイズを超えたときに解析されたレコードを切り捨てるために、エージェントが使用する文字列。(1,000 KB)

デフォルト: '\n' (改行)

複数のファイルディレクトリを監視し、複数のストリームに書き込み

複数のフロー設定を指定することによって、エージェントが複数のファイルディレクトリを監視し、複数のストリームにデータを送信するように設定できます。次の設定例では、エージェントは 2 つのファイルディレクトリをモニタリングし、それぞれ Kinesis ストリームと Firehose 配信ストリームにデータを送信します。Kinesis Data Streams と Firehose に異なるエンドポイントを指定して、Kinesis ストリームと Firehose 配信ストリームが同じリージョンに存在する必要がないようにできます。

{ "cloudwatch.emitMetrics": true, "kinesis.endpoint": "https://your/kinesis/endpoint", "firehose.endpoint": "https://your/firehose/endpoint", "flows": [ { "filePattern": "/tmp/app1.log*", "kinesisStream": "yourkinesisstream" }, { "filePattern": "/tmp/app2.log*", "deliveryStream": "yourfirehosedeliverystream" } ] }

Firehose で エージェントを使用する方法の詳細については、「Kinesis Agent を使用した Amazon Data Firehose への書き込み」を参照してください。

エージェントを使用してデータを事前処理する

エージェントはストリームにレコードを送信する前に、モニタリング対象ファイルから解析したレコードを事前処理できます。ファイルフローに dataProcessingOptions 設定を追加することで、この機能を有効にできます。1 つ以上の処理オプションを追加でき、また指定されている順序で実行されます。

エージェントは、リストされた次の処理オプションに対応しています。エージェントはオープンソースであるため、処理オプションを開発および拡張できます。Kinesis エージェントからエージェントをダウンロードできます。

処理オプション
SINGLELINE

改行文字、先頭のスペース、末尾のスペースを削除することで、複数行レコードを単一行レコードに変換します。

{ "optionName": "SINGLELINE" }
CSVTOJSON

区切り形式から JSON 形式にレコードを変換します。

{ "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", ... ], "delimiter": "yourdelimiter" }
customFieldNames

[必須] 各 JSON キー値のペアでキーとして使用されるフィールド名。たとえば、["f1", "f2"] を指定した場合は、レコードv1、v2は {"f1":"v1","f2":"v2"} に変換されます。

delimiter

レコードで区切り記号として使用する文字列。デフォルトはカンマ (,) です。

LOGTOJSON

ログ形式から JSON 形式にレコードを変換します。サポートされているログ形式は、Apache Common LogApache Combined LogApache Error Log、および RFC3164 Syslog です。

{ "optionName": "LOGTOJSON", "logFormat": "logformat", "matchPattern": "yourregexpattern", "customFieldNames": [ "field1", "field2", ] }
logFormat

[必須] ログエントリ形式。以下の値を指定できます。

  • COMMONAPACHELOG — Apache Common Log 形式。各ログエントリは、デフォルトで次のパターン%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}になります。

  • COMBINEDAPACHELOG — Apache Combined Log 形式。各ログエントリは、デフォルトで次のパターン%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}になります。

  • APACHEERRORLOG — Apache Error Log 形式。各ログエントリは、デフォルトで次のパターン[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}になります。

  • SYSLOG — FC3164 Syslog 形式。各ログエントリは、デフォルトで次のパターン%{timestamp} %{hostname} %{program}[%{processid}]: %{message}になります。

matchPattern

ログエントリから値を取得するために使用する正規表現パターン。この設定は、ログエントリが定義されたログ形式の一つとして存在していない場合に使用されます。この設定を使用する場合は、customFieldNames を指定する必要があります。

customFieldNames

JSON キー値のペアでキーとして使用されるカスタムフィールド名。matchPattern から抽出した値のフィールド名を定義するために、または事前定義されたログ形式のデフォルトのフィールド名を上書きするために、この設定を使用できます。

例 : LOGTOJSON 設定

JSON形式に変換された Apache Common Log エントリの LOGTOJSON 設定の一つの例を次に示します。

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" }

変換前:

64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291

変換後:

{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
例 : カスタムフィールドがある LOGTOJSON 設定

こちらは LOGTOJSON 設定の別の例です。

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"] }

この設定では、前の例からの同じ Apache Common Log エントリは、次のように JSON 形式に変換されます。

{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
例 : Apache Common Log エントリの変換

次のフロー設定は Apache Common Log エントリを JSON 形式の単一行レコードに変換します。

{ "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "my-stream", "dataProcessingOptions": [ { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" } ] } ] }
例 : 複数行レコードの変換

次のフロー設定は、最初の行が[SEQUENCE=で開始している複数行レコードを解析します。各レコードはまず単一行レコードに変換されます。次に、値はタブの区切り記号に基づいたレコードから取得されます。取得された値は指定された customFieldNames 値にマッピングされ、JSON 形式の単一行レコードを形成します。

{ "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "my-stream", "multiLineStartPattern": "\\[SEQUENCE=", "dataProcessingOptions": [ { "optionName": "SINGLELINE" }, { "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", "field3" ], "delimiter": "\\t" } ] } ] }
例 : 一致パターンで LOGTOJSON 設定

こちらは、最後のフィールド (バイト) が省略された JSON 形式に変換された Apache Common Log エントリの LOGTOJSON 設定の一例です。

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})", "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"] }

変換前:

123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200

変換後:

{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}

エージェント CLI コマンド

システムスタートアップ時のエージェントの自動的開始:

sudo chkconfig aws-kinesis-agent on

エージェントのステータスの確認:

sudo service aws-kinesis-agent status

エージェントの停止:

sudo service aws-kinesis-agent stop

この場所からエージェントのログファイルを読む:

/var/log/aws-kinesis-agent/aws-kinesis-agent.log

エージェントのアンインストール:

sudo yum remove aws-kinesis-agent

よくある質問

Windows 用の Kinesis Agent はありますか?

Windows 用 Kinesis Agent は Linux プラットフォーム用 Kinesis Agent とは異なるソフトウェアです。

Kinesis Agent の速度が低下したり、RecordSendErrors が増加したりするのはなぜですか?

通常、これは Kinesis のスロットリングが原因です。Kinesis Data Streams の WriteProvisionedThroughputExceededメトリクスまたは Firehose 配信ストリームの ThrottledRecordsメトリクスを確認します。これらのメトリクスが 0 を超えている場合は、ストリームの上限を引き上げる必要があることを意味します。詳細については、「Kinesis Data Stream limits」と「Amazon Firehose Delivery Streams」を参照してください。

スロットリングが原因ではないことがわかったら、Kinesis Agent が大量の小規模ファイルをテーリングするように設定されているかどうかを確認してください。Kinesis Agent が新しいファイルをテーリングするときには遅延が発生するため、少量の大きなファイルをテーリングするようにします。ログファイルを大きなファイルに統合してみてください。

java.lang.OutOfMemoryError の例外が発生するのはなぜですか?

Kinesis Agent に、現在のワークロードを処理するための十分なメモリがないためです。/usr/bin/start-aws-kinesis-agentJAVA_START_HEAPJAVA_MAX_HEAP を増やしてエージェントを再起動してみてください。

IllegalStateException : connection pool shut down の例外が発生するのはなぜですか?

Kinesis エージェントに、現在のワークロードを処理するための十分な接続がないためです。/etc/aws-kinesis/agent.json の一般的なエージェント設定で maxConnectionsmaxSendingThreads を増やしてみてください。これらのフィールドのデフォルト値は、使用可能なランタイムプロセッサの 12 倍です。エージェント設定の詳細については、AgentConfiguration「.java」を参照してください。

Kinesis Agent に関する別の問題をデバッグする方法を教えてください。

DEBUG レベルログは /etc/aws-kinesis/log4j.xml で有効にできます。

Kinesis Agent はどのように設定するとよいですか?

maxBufferSizeBytes の値が小さいほど、Kinesis Agent がデータを送信する頻度が高くなります。そのため、レコードの配信時間が短縮されますが、Kinesis への 1 秒あたりのリクエスト数も増えます。

Kinesis Agent が重複レコードを送信するのはなぜですか?

これはファイルテーリングの設定ミスが原因です。各 fileFlow’s filePattern が それぞれ 1 つのファイルのみと一致するようにします。また、使用されている logrotate モードが copytruncate モードになっている場合にも発生することがあります。重複を避けるため、モードをデフォルトか作成モードに変更してみてください。重複レコードの処理に関する詳細は、「Handling Duplicate Records」を参照してください。