AWS CloudTrail ログのクエリ - Amazon Athena

「翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。」

AWS CloudTrail ログのクエリ

AWS CloudTrail は、AWS アカウントの AWS API コールとイベントを記録するサービスです。

CloudTrail ログには、AWS のサービス (コンソールなど) に対する API コールの詳細が含まれています。CloudTrail は暗号化されたログファイルを生成し、Amazon S3 に保存します。詳細については、AWS CloudTrail ユーザーガイドを参照してください。

CloudTrail ログで Athena を使用すると、AWS サービスのアクティビティ分析が大幅に強化されます。たとえば、クエリを使用して傾向を識別したり、アクティビティを属性 (ソース IP アドレスやユーザーなど) でさらに分離したりできます。

CloudTrail ログの一般的な用途として、運用上のアクティビティを分析してセキュリティやコンプライアンスに役立てることができます。詳細な例については、AWS Big Data Blog の記事「AWS CloudTrail と Amazon Athena を使用してセキュリティ、コンプライアンス、運用上のアクティビティを分析する方法」を参照してください。

Athena では、保存先の Amazon S3 を LOCATION で指定し、そこでログファイルを直接クエリできます。これには以下の 2 つの方法があります。

  • CloudTrail コンソールから直接 CloudTrail ログファイルのテーブルを作成する。

  • Athena コンソールで CloudTrail ログファイルのテーブルを手動で作成する。

CloudTrail ログと Athena テーブルについて

テーブルの作成を開始する前に、CloudTrail の詳細およびデータの保存方法についてもう少し理解しておく必要があります。これは、テーブルを CloudTrail コンソールから作成するにしても Athena から作成するにしても、必要なテーブルを作成する役に立ちます。

CloudTrail は、ログを JSON テキストファイルとして gzip 圧縮形式 (*.json.gzip) で保存します。ログファイルの保存先は、証跡、ログを記録する AWS リージョン、その他の要因の設定方法に応じて異なります。

ログの保存場所、JSON 構造、およびレコードファイルの内容の詳細については、AWS CloudTrail ユーザーガイドの以下のトピックを参照してください。

ログを収集して Amazon S3 に保存するには、CloudTrail から AWS マネジメントコンソール を有効にします。詳細については、AWS ユーザーガイドの「CloudTrail証跡の作成」を参照してください。

ログを保存する送信先 Amazon S3 バケットをメモしておきます。LOCATION 句を CloudTrail の保存先および作業対象のオブジェクトセットへのパスに置き換えてください。例では、特定のアカウントに関するログの LOCATION 値を使用していますが、目的に応じた保存先を指定できます。

次に例を示します。

  • 複数のアカウントのデータを分析するには、LOCATION を元の設定に戻し、LOCATION 's3://MyLogFiles/AWSLogs/ を使用してすべての AWSLogs を分析対称にします。

  • 特定の日付、アカウント、リージョンのデータを分析するには、LOCATION `s3://MyLogFiles/123456789012/CloudTrail/us-east-1/2016/03/14/'. を使用します。

オブジェクト階層の最上位レベルを使用すると、Athena でのクエリを最も柔軟に実行できます。

CloudTrail コンソールを使用した CloudTrail ログの Athena テーブルの作成

コンソールから直接 Athena ログをクエリするために、パーティション化されていない CloudTrail テーブルを作成できます。CloudTrailCloudTrail コンソールから Athena テーブルを作成するには、Athena でテーブルを作成するのに十分なアクセス許可を持つ IAM ユーザーまたはロールでログインする必要があります。

CloudTrail コンソールを使用して CloudTrail 証跡の Athena テーブルを作成するには

  1. https://console.aws.amazon.com/cloudtrail/ にある CloudTrail コンソールを開きます。

  2. ナビゲーションペインで [Event history (イベント履歴)] を選択します。

  3. 次のいずれかを行ってください。

    • 新しい CloudTrail コンソールを使用している場合は、[Create Athena table (Athena テーブルの作成)] を選択します。

      
                                [Create Athena table (Athena テーブルの作成)] を選択します。
    • 古い CloudTrail コンソールを使用している場合は、[ で高度なクエリを実行Amazon Athena] を選択します。

      
                                [ で高度なクエリを実行するAmazon Athena] を選択します。
  4. [保存場所] で、下矢印を使用し、ログファイルが保存され証跡がクエリされる Amazon S3 バケットを選択します。

    注記

    証跡に関連付けられているバケットの名前を確認するには、 ナビゲーションペインで [証跡CloudTrail] を選択し、証跡の [S3 バケット] 列を表示します。バケットの Amazon S3 の場所を確認するには、[S3 バケット] 列でバケットのリンクを選択します。これにより、Amazon S3 バケットの場所を指定する CloudTrail コンソールが開きます。

  5. [Create table] を選択します。Amazon S3 バケットの名前を含むデフォルト名が付いたテーブルが作成されます。

手動パーティション処理を使用して CloudTrail で Athena ログのテーブルを作成する

Athena コンソールで、CloudTrail ログファイルのテーブルを手動で作成し、Athena でクエリを実行できます。

Athena コンソールを使用して CloudTrail 証跡の Athena テーブルを作成するには

  1. 次の DDL ステートメントをコピーして Athena コンソール内に貼り付けます。このステートメントは、CloudTrail コンソールの [Amazon Athena にテーブルを作成] ダイアログボックスにあるステートメントと同じですが、テーブルをパーティション化する PARTITIONED BY 句を追加します。

  2. ログデータの保存先の Amazon S3 バケットを参照するように s3://CloudTrail_bucket_name/AWSLogs/Account_ID/CloudTrail/ を変更します。

  3. フィールドが正しく表示されていることを確認します。レコードのフィールドの完全リストに関する詳細については、「CloudTrailレコードの内容」を参照してください。CloudTrail

    この例では、フィールド requestparametersresponseelements、および additionaleventdata は、クエリでタイプ STRING としてリストされていますが、JSON で使用される STRUCT データ型です。そのため、これらのフィールドからデータを取得するには、JSON_EXTRACT 関数を使用します。詳細については、「JSON からのデータの抽出」を参照してください。パフォーマンスを向上させるために、この例ではリージョン、年、月、日に基づいてデータをパーティション分割しています。

    CREATE EXTERNAL TABLE cloudtrail_logs ( eventversion STRING, useridentity STRUCT< type:STRING, principalid:STRING, arn:STRING, accountid:STRING, invokedby:STRING, accesskeyid:STRING, userName:STRING, sessioncontext:STRUCT< attributes:STRUCT< mfaauthenticated:STRING, creationdate:STRING>, sessionissuer:STRUCT< type:STRING, principalId:STRING, arn:STRING, accountId:STRING, userName:STRING>>>, eventtime STRING, eventsource STRING, eventname STRING, awsregion STRING, sourceipaddress STRING, useragent STRING, errorcode STRING, errormessage STRING, requestparameters STRING, responseelements STRING, additionaleventdata STRING, requestid STRING, eventid STRING, resources ARRAY<STRUCT< ARN:STRING, accountId:STRING, type:STRING>>, eventtype STRING, apiversion STRING, readonly STRING, recipientaccountid STRING, serviceeventdetails STRING, sharedeventid STRING, vpcendpointid STRING ) PARTITIONED BY (region string, year string, month string, day string) ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://CloudTrail_bucket_name/AWSLogs/Account_ID/CloudTrail/';
  4. Athena コンソールでクエリを実行します。

  5. 以下の例のように、ALTER TABLE ADD PARTITION コマンドを使用してパーティションをロードすることで、データをクエリできるようにします。

    ALTER TABLE table_name ADD PARTITION (region='us-east-1', year='2019', month='02', day='01') LOCATION 's3://CloudTrail_bucket_name/AWSLogs/Account_ID/CloudTrail/us-east-1/2019/02/01/'

パーティション射影を使用して CloudTrail で Athena ログのテーブルを作成する

ログには、パーティションスキームを事前に指定できる既知の構造があるため、CloudTrail パーティション射影機能を使用して、クエリランタイムを減らし、パーティション管理を自動化できます。Athenaパーティション射影では、新しいデータが追加されると自動的に新しいパーティションが追加されます。これにより、ALTER TABLE ADD PARTITION を使用してパーティションを手動で追加する必要がなくなります。

次の CREATE TABLE ステートメントの例では、1 つの CloudTrail リージョンについて存在するまで、指定された日付の AWS ログのパーティション射影を自動的に使用します。および LOCATION 句で、storage.location.template を置き換えます。bucket, account-id, および aws-region 対応する同じ値を持つ プレースホルダー。の projection.timestamp.range2020/01/01 使用する開始日の 。クエリが正常に実行されたら、テーブルに対してクエリを実行できます。パーティションをロードするために ALTER TABLE ADD PARTITION を実行する必要はありません。

CREATE EXTERNAL TABLE cloudtrail_logs_pp( eventVersion STRING, userIdentity STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, invokedBy: STRING, accessKeyId: STRING, userName: STRING, sessionContext: STRUCT< attributes: STRUCT< mfaAuthenticated: STRING, creationDate: STRING>, sessionIssuer: STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, userName: STRING>>>, eventTime STRING, eventSource STRING, eventName STRING, awsRegion STRING, sourceIpAddress STRING, userAgent STRING, errorCode STRING, errorMessage STRING, requestParameters STRING, responseElements STRING, additionalEventData STRING, requestId STRING, eventId STRING, readOnly STRING, resources ARRAY<STRUCT< arn: STRING, accountId: STRING, type: STRING>>, eventType STRING, apiVersion STRING, recipientAccountId STRING, serviceEventDetails STRING, sharedEventID STRING, vpcEndpointId STRING ) PARTITIONED BY ( `timestamp` string) ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://bucket/AWSLogs/account-id/CloudTrail/aws-region' TBLPROPERTIES ( 'projection.enabled'='true', 'projection.timestamp.format'='yyyy/MM/dd', 'projection.timestamp.interval'='1', 'projection.timestamp.interval.unit'='DAYS', 'projection.timestamp.range'='2020/01/01,NOW', 'projection.timestamp.type'='date', 'storage.location.template'='s3://bucket/AWSLogs/account-id/CloudTrail/aws-region/${timestamp}')

パーティション射影の詳細については、「Amazon Athena を使用したパーティション射影」を参照してください。

ネストされたフィールドのクエリ

フィールドと userIdentity フィールドはネストされたデータ型であるため、フィールドに対してクエリを実行するには特別な処理が必要です。resources

オブジェクトは、ネストされた userIdentity 型で構成されます。STRUCT以下の例のように、フィールドを区切るためにドットを使用してこれらをクエリできます。

SELECT eventsource, eventname, useridentity.sessioncontext.attributes.creationdate, useridentity.sessioncontext.sessionissuer.arn FROM cloudtrail_logs WHERE useridentity.sessioncontext.sessionissuer.arn IS NOT NULL ORDER BY eventsource, eventname LIMIT 10

フィールドは、resources オブジェクトの配列です。STRUCTこれらの配列で、CROSS JOIN UNNEST を使用して配列をネスト解除し、そのオブジェクトをクエリできるようにします。

次の例では、リソース ARN が example/datafile.txt で終わるすべての行が返されます。 読みやすくするために、replace 関数は ARN から最初の arn:aws:s3::: サブストリングを削除します。

SELECT awsregion, replace(unnested.resources_entry.ARN,'arn:aws:s3:::') as s3_resource, eventname, eventtime, useragent FROM cloudtrail_logs t CROSS JOIN UNNEST(t.resources) unnested (resources_entry) WHERE unnested.resources_entry.ARN LIKE '%example/datafile.txt' ORDER BY eventtime

次の例では、DeleteBucket イベントをクエリします。クエリは、バケットの名前とバケットが属するアカウント ID を resources オブジェクトから抽出します。

SELECT awsregion, replace(unnested.resources_entry.ARN,'arn:aws:s3:::') as deleted_bucket, eventtime AS time_deleted, useridentity.username, unnested.resources_entry.accountid as bucket_acct_id FROM cloudtrail_logs t CROSS JOIN UNNEST(t.resources) unnested (resources_entry) WHERE eventname = 'DeleteBucket' ORDER BY eventtime

ネスト解除の詳細については、「配列のフィルタ処理」を参照してください。

クエリの例

次の例は、CloudTrail イベントログ用に作成されたテーブルからすべての匿名 (署名されていない) リクエストを返すクエリの一部を示しています。このクエリは、useridentity.accountid が匿名、および useridentity.arn が指定されていないリクエストを選択します。

SELECT * FROM cloudtrail_logs WHERE eventsource = 's3.amazonaws.com' AND eventname in ('GetObject') AND useridentity.accountid LIKE '%ANONYMOUS%' AND useridentity.arn IS NULL AND requestparameters LIKE '%[your bucket name ]%';

詳細については、AWS Big Data Blog の記事「AWS CloudTrail と Amazon Athena を使用してセキュリティ、コンプライアンス、運用上のアクティビティを分析する方法」を参照してください。

CloudTrail ログのクエリに関するヒント

CloudTrail ログデータを探索するには、以下のヒントを参考にしてください。

  • ログをクエリする前に、ログテーブルが「手動パーティション処理を使用して CloudTrail で Athena ログのテーブルを作成する」に示している内容と同じになっていることを確認します。これが最初のテーブルではない場合は、次のコマンドを使用して既存のテーブルを削除します。DROP TABLE cloudtrail_logs;

  • 既存のテーブルを削除した後、再作成します。詳細については、「 ログのテーブルの作成CloudTrail」を参照してください。

    Athena クエリのフィールドが正しく表示されていることを確認します。レコードのフィールドの完全リストについては、「CloudTrail レコードの内容CloudTrail」を参照してください。

    クエリ内のフィールドが JSON 形式 (STRUCT など) である場合は、JSON からデータを抽出します。詳細については、「JSON からのデータの抽出」を参照してください。

    これで CloudTrail テーブルに対してクエリを発行する準備完了です。

  • 最初に、どの IAM ユーザーが、どの送信元 IP アドレスから API オペレーションを呼び出したかを確認します。

  • 次の基本的な SQL クエリをテンプレートとして使用します。このクエリを Athena コンソールに貼り付けて実行します。

    SELECT useridentity.arn, eventname, sourceipaddress, eventtime FROM cloudtrail_logs LIMIT 100;
  • さらに、前述のクエリを変更してデータを探索します。

  • パフォーマンスを強化するには、LIMIT 句を追加して、指定したサブセットの行のみが返るようにします。