AWS CloudTrail ログのクエリ - Amazon Athena

AWS CloudTrail ログのクエリ

AWS CloudTrail は、Amazon Web Services アカウントの AWS API コールとイベントを記録するサービスです。

CloudTrail ログには、コンソールを含めた AWS のサービス に対して発行された、あらゆる API コールの詳細が記載されます。CloudTrail は暗号化されたログファイルを生成して、それらを Amazon S3 に保存します。詳細については、AWS CloudTrailユーザーガイドを参照してください。

注記

アカウント、リージョン、および日付にわたって CloudTrail イベント情報に対して SQL クエリを実行する場合は、CloudTrail Lake の使用を検討してください。CloudTrail Lakeは、企業からの情報を単一の検索可能なイベントデータストアに集約するトレイルを作成する AWS の代替です。Amazon S3 バケットストレージを使用する代わりに、イベントはデータレイクに保存されるため、より豊富で高速なクエリが可能になります。これを使用して、組織、リージョン間、およびカスタム時間範囲内でイベントを検索する SQL クエリを作成できます。CloudTrail Lake クエリは CloudTrail コンソール自体で実行するため、CloudTrail Lake を使用すると Athena は必要ありません。詳細については、CloudTrail Lake のドキュメントを参照してください。

Athena での CloudTrail ログ の使用は、AWS のサービス のアクティビティに関する分析を強化する手段として優れています。たとえば、クエリを使用して傾向を識別したり、アクティビティを属性 (ソース IP アドレスやユーザーなど) でさらに分離したりできます。

一般的な用途は、セキュリティとコンプライアンスのための運用上のアクティビティの分析に CloudTrail ログを使用することです。詳細な例については、「AWS ビッグデータブログ」の記事「AWS CloudTrail と Amazon Athena を使用してセキュリティ、コンプライアンス、運用上のアクティビティを分析する方法」を参照してください。

Athena は、ログファイルの LOCATION を指定して、Amazon S3 からこれらのログファイルを直接クエリするために使用できます。以下の 2 つの方法のいずれかを実行できます。

  • CloudTrail コンソールから直接 CloudTrailログファイルのテーブルを作成する。

  • Athena コンソールで CloudTrail ログファイルのテーブルを手動で作成する。

CloudTrail ログと Athena テーブルについて

テーブルの作成を開始する前に、CloudTrail、そしてそれがどのようにデータを保存するかについての理解を深めておく必要があります。これは、テーブルを CloudTrail コンソールから作成するか Athena から作成するかにかかわらず、必要なテーブルを作成するために役立ちます。

CloudTrail は、ログを JSON テキストファイルとして、圧縮された gzip 形式 (*.json.gzip) に保存します。ログファイルの保存先は、証跡のセットアップ方法、AWS リージョン あるいはログを記録する対象のリージョン、その他の要因によって異なります。

ログの保存先、JSON 構造、およびレコードファイルの内容の詳細については、AWS CloudTrail ユーザーガイドの以下のトピックを参照してください。

ログを収集して Amazon S3 に保存し、AWS Management Console で CloudTrail を有効にします。詳細については、「AWS CloudTrail ユーザーガイド」の「証跡の作成」を参照してください。

ログの保存先の Amazon S3 バケットをメモしておきます。LOCATION 句を、CloudTrail ログの場所と使用するオブジェクトのセットへのパスに置き換えます。例では、特定のアカウントに関するログの LOCATION 値を使用していますが、目的に応じた保存先を指定できます。

以下に例を示します。

  • 複数のアカウントのデータを分析するには、LOCATION を元の設定に戻し、AWSLogs を使用してすべての LOCATION 's3://MyLogFiles/AWSLogs/' を分析対称にします。

  • 特定の日付、アカウント、リージョンのデータを分析するには、LOCATION 's3://MyLogFiles/123456789012/CloudTrail/us-east-1/2016/03/14/'. を使用します。

オブジェクト階層の最上位レベルを使用することで、Athena を使用したクエリの実行時に最大の柔軟性が提供されます。

CloudTrail ログ用の Athena テーブルを作成するための CloudTrail コンソールの使用

CloudTrail コンソールから直接 CloudTrail ログをクエリするために、パーティションされていない Athena テーブルを作成できます。CloudTrail コンソールからの Athena テーブルの作成には、Athena でテーブルを作成するために十分なアクセス許可を持つロールでログインする必要があります。

注記

CloudTrail コンソールを使用して、組織の証跡ログ用の Athena テーブルを作成することはできません。代わりに、Athena コンソールを使用してテーブルを手動で作成し、正しいストレージ場所を指定できるようにします。組織の証跡については、AWS CloudTrail ユーザーガイドの「組織の証跡の作成」を参照してください。

CloudTrail コンソールを使用して CloudTrail 証跡用の Athena テーブルを作成する
  1. CloudTrail コンソール (https://console.aws.amazon.com/cloudtrail/) を開きます。

  2. ナビゲーションペインで [Event history (イベント履歴)] を選択します。

  3. [Athena テーブルを作成] をクリックします。

    
                        [Create Athena table] (Athena テーブルを作成) をクリックします。
  4. [Storage location] (ストレージの場所) の下矢印を使用して、クエリする証跡のためにログファイルが保存される Amazon S3 バケットを選択します。

    注記

    証跡に関連付けられているバケットの名前を探すには、CloudTrail のナビゲーションペインで [Trails] (証跡) を選択して、証跡の [S3 bucket] (S3 バケット) 列を表示します。バケットの Amazon S3 の場所を確認するには、[S3 bucket] (S3 バケット) 列でバケットのリンクを選択します。これにより、Amazon S3 コンソールで CloudTrail バケットの場所が開きます。

  5. [Create table] (テーブルの作成) を選択します。Amazon S3 バケットの名前が含まれたデフォルトの名前でテーブルが作成されます。

Athena で手動パーティショニングを使用して CloudTrail ログ用のテーブルを作成する

Athena コンソールで、CloudTrail ログファイル用のテーブルを手動で作成してから、Athena でクエリを実行することができます。

Athena コンソールを使用して CloudTrail 証跡のための Athena テーブルを作成する
  1. 以下の DDL ステートメントを Athena コンソールクエリエディタにコピーして貼り付けます。

    CREATE EXTERNAL TABLE cloudtrail_logs ( eventversion STRING, useridentity STRUCT< type:STRING, principalid:STRING, arn:STRING, accountid:STRING, invokedby:STRING, accesskeyid:STRING, userName:STRING, sessioncontext:STRUCT< attributes:STRUCT< mfaauthenticated:STRING, creationdate:STRING>, sessionissuer:STRUCT< type:STRING, principalId:STRING, arn:STRING, accountId:STRING, userName:STRING>, ec2RoleDelivery:string, webIdFederationData:map<string,string> > >, eventtime STRING, eventsource STRING, eventname STRING, awsregion STRING, sourceipaddress STRING, useragent STRING, errorcode STRING, errormessage STRING, requestparameters STRING, responseelements STRING, additionaleventdata STRING, requestid STRING, eventid STRING, resources ARRAY<STRUCT< arn:STRING, accountid:STRING, type:STRING>>, eventtype STRING, apiversion STRING, readonly STRING, recipientaccountid STRING, serviceeventdetails STRING, sharedeventid STRING, vpcendpointid STRING, tlsDetails struct< tlsVersion:string, cipherSuite:string, clientProvidedHostHeader:string> ) PARTITIONED BY (region string, year string, month string, day string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://CloudTrail_bucket_name/AWSLogs/Account_ID/CloudTrail/';
  2. (オプション) テーブルに必要のないフィールドをすべて削除します。特定の列のセットのみを読み込む必要がある場合は、テーブル定義で他の列を除外できます。

  3. ログデータが含まれる保存先の Amazon S3 バケットをポイントするように s3://CloudTrail_bucket_name/AWSLogs/Account_ID/CloudTrail/ を変更します。

  4. フィールドが正しく表示されていることを確認します。CloudTrail レコード内のフィールドの完全なリストに関する詳細については、「CloudTrail レコードの内容」を参照してください。

    次の例では Hive JSON SerDe を使用しています。この例では、フィールド requestparametersresponseelements、および additionaleventdata は、クエリでタイプ STRING としてリストされていますが、JSON で使用される STRUCT データ型です。そのため、これらのフィールドからデータを取得するには、JSON_EXTRACT 関数を使用します。詳細については、「JSON からのデータの抽出」を参照してください。パフォーマンスを向上させるために、この例では、AWS リージョン、年、月、日に基づいてデータをパーティションしています。

  5. Athena コンソールで CREATE TABLE ステートメントを実行します。

  6. 以下の例のように、ALTER TABLE ADD PARTITION コマンドを使用してパーティションをロードすることで、データをクエリできるようにします。

    ALTER TABLE table_name ADD PARTITION (region='us-east-1', year='2019', month='02', day='01') LOCATION 's3://cloudtrail_bucket_name/AWSLogs/Account_ID/CloudTrail/us-east-1/2019/02/01/'

手動パーティショニングを使用して組織全体の証跡用のテーブルを作成する

Athena で組織全体の CloudTrail ログファイル用のテーブルを作成するには、「Athena で手動パーティショニングを使用して CloudTrail ログ用のテーブルを作成する」(Athena で手動パーティショニングを使用して CloudTrail ログ用のテーブルを作成する) の手順に従います。ただし、次の手順に記載されている変更を加えてください。

組織全体の CloudTrail ログ用の Athena テーブルを作成するには
  1. 次の例のように、CREATE TABLE ステートメントで、組織 ID が含まれるように LOCATION 句を変更します。

    LOCATION 's3://cloudtrail_bucket_name/AWSLogs/organization_id/Account_ID/CloudTrail/'
  2. 次の例のように、PARTITIONED BY 句で、アカウント ID のエントリを文字列として追加します。

    PARTITIONED BY (account string, region string, year string, month string, day string)

    次の例は、結合された結果を示しています。

    ... PARTITIONED BY (account string, region string, year string, month string, day string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://cloudtrail_bucket_name/AWSLogs/organization_id/Account_ID/CloudTrail/'
  3. 次の例のように、ALTER TABLE ステートメントの ADD PARTITION 句には、アカウント ID を含めます。

    ALTER TABLE table_name ADD PARTITION (account='111122223333', region='us-east-1', year='2022', month='08', day='08')
  4. 次の例のように、ALTER TABLE ステートメントの LOCATION 句には、追加する組織 ID、アカウント ID、パーティションを含めます。

    LOCATION 's3://cloudtrail_bucket_name/AWSLogs/organization_id/Account_ID/CloudTrail/us-east-1/2022/08/08/'

    次の例では、ALTER TABLE ステートメントは結合された結果を示します。

    ALTER TABLE table_name ADD PARTITION (account='111122223333', region='us-east-1', year='2022', month='08', day='08') LOCATION 's3://cloudtrail_bucket_name/AWSLogs/organization_id/111122223333/CloudTrail/us-east-1/2022/08/08/'

パーティション射影を使用した Athena での CloudTrail ログ用テーブルの作成

CloudTrail ログには事前に指定できるパーティションスキームを持つ既知の構造があるため、Athena のパーティション射影機能を使用することで、クエリランタイムを短縮し、パーティション管理を自動化できます。新しいデータが追加されると、パーティション射影は新しいパーティションを自動で追加します。このため、ALTER TABLE ADD PARTITION を使用してパーティションを手動で追加する必要がなくなります。

次の CREATE TABLE ステートメント例では、指定した日付から現在までの単一の AWS リージョン リージョンの CloudTrail ログで、パーティション射影を自動的に使用しています。LOCATION 句と storage.location.template 句では、bucketaccount-id、および aws-region の各プレースホルダーを、対応する同等の値に置き換えます。projection.timestamp.range では、2020/01/01 を使用を開始する日に置き換えます。クエリが正常に実行されると、テーブルをクエリできます。パーティションをロードするのに、ALTER TABLE ADD PARTITION を実行する必要はありません。

CREATE EXTERNAL TABLE cloudtrail_logs_pp( eventVersion STRING, userIdentity STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, invokedBy: STRING, accessKeyId: STRING, userName: STRING, sessionContext: STRUCT< attributes: STRUCT< mfaAuthenticated: STRING, creationDate: STRING>, sessionIssuer: STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, userName: STRING>, ec2RoleDelivery:string, webIdFederationData:map<string,string> > >, eventTime STRING, eventSource STRING, eventName STRING, awsRegion STRING, sourceIpAddress STRING, userAgent STRING, errorCode STRING, errorMessage STRING, requestparameters STRING, responseelements STRING, additionaleventdata STRING, requestId STRING, eventId STRING, readOnly STRING, resources ARRAY<STRUCT< arn: STRING, accountId: STRING, type: STRING>>, eventType STRING, apiVersion STRING, recipientAccountId STRING, serviceEventDetails STRING, sharedEventID STRING, vpcendpointid STRING, tlsDetails struct< tlsVersion:string, cipherSuite:string, clientProvidedHostHeader:string> ) PARTITIONED BY ( `timestamp` string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://bucket/AWSLogs/account-id/CloudTrail/aws-region' TBLPROPERTIES ( 'projection.enabled'='true', 'projection.timestamp.format'='yyyy/MM/dd', 'projection.timestamp.interval'='1', 'projection.timestamp.interval.unit'='DAYS', 'projection.timestamp.range'='2020/01/01,NOW', 'projection.timestamp.type'='date', 'storage.location.template'='s3://bucket/AWSLogs/account-id/CloudTrail/aws-region/${timestamp}')

パーティション射影の詳細については、「Amazon Athena でのパーティション射影」を参照してください。

ネストされたフィールドのクエリ

userIdentity および resources フィールドはネストされたデータ型であるため、それらのクエリには特別な処理が必要です。

userIdentity オブジェクトは、ネストされた STRUCT 型で構成されています。以下の例にあるように、これらは、フィールドを区切るためにドットを使用してクエリすることができます。

SELECT eventsource, eventname, useridentity.sessioncontext.attributes.creationdate, useridentity.sessioncontext.sessionissuer.arn FROM cloudtrail_logs WHERE useridentity.sessioncontext.sessionissuer.arn IS NOT NULL ORDER BY eventsource, eventname LIMIT 10

resources フィールドは STRUCT オブジェクトの配列です。これらの配列では、CROSS JOIN UNNEST を使用して配列のネストを解除し、そのオブジェクトをクエリできるようにします。

以下の例は、リソース ARN が example/datafile.txt で終わるすべての行を返します。読みやすさのために、replace 関数が ARN から初期の arn:aws:s3::: サブストリングを削除します。

SELECT awsregion, replace(unnested.resources_entry.ARN,'arn:aws:s3:::') as s3_resource, eventname, eventtime, useragent FROM cloudtrail_logs t CROSS JOIN UNNEST(t.resources) unnested (resources_entry) WHERE unnested.resources_entry.ARN LIKE '%example/datafile.txt' ORDER BY eventtime

以下の例は、DeleteBucket のイベントをクエリします。このクエリは、バケットの名前とバケットが属するアカウント ID を resources オブジェクトから抽出します。

SELECT awsregion, replace(unnested.resources_entry.ARN,'arn:aws:s3:::') as deleted_bucket, eventtime AS time_deleted, useridentity.username, unnested.resources_entry.accountid as bucket_acct_id FROM cloudtrail_logs t CROSS JOIN UNNEST(t.resources) unnested (resources_entry) WHERE eventname = 'DeleteBucket' ORDER BY eventtime

ネスト解除の詳細については、「配列のフィルタ処理」を参照してください。

サンプルクエリ

以下の例は、CloudTrail イベントログ向けに作成されたテーブルから、匿名化された (署名されていない) すべてのリクエストを返すクエリの一部を示しています。このクエリは、useridentity.accountid が匿名で useridentity.arn が指定されていないリクエストを選択します。

SELECT * FROM cloudtrail_logs WHERE eventsource = 's3.amazonaws.com' AND eventname in ('GetObject') AND useridentity.accountid = 'anonymous' AND useridentity.arn IS NULL AND requestparameters LIKE '%[your bucket name ]%';

詳細については、「AWS Big·Data·Blog」(ビッグデータブログ) の記事「AWS CloudTrail と Amazon Athena を使用して、セキュリティ、コンプライアンス、運用上のアクティビティを分析する」を参照してください。

CloudTrail ログのクエリに関するヒント

CloudTrail ログデータを探索するには、以下のヒントを参考にしてください。

  • ログをクエリする前に、ログテーブルが「Athena で手動パーティショニングを使用して CloudTrail ログ用のテーブルを作成する」に示している内容と同じになっていることを確認します。これが最初のテーブルではない場合は、コマンド DROP TABLE cloudtrail_logs を使用して既存のテーブルを削除します。

  • 既存のテーブルを削除した後、再作成します。詳細については、「Athena で手動パーティショニングを使用して CloudTrail ログ用のテーブルを作成する」を参照してください。

    Athena クエリのフィールドが正しく表示されていることを確認します。CloudTrail レコード内のフィールドの完全なリストについては、「CloudTrail レコードの内容」を参照してください。

    クエリ内のフィールドが JSON 形式 (STRUCT など) である場合は、JSON からデータを抽出します。詳細については、「JSON からのデータの抽出」を参照してください。

    CloudTrail テーブルに対してクエリを発行するためのいくつかの提案事項:

  • 最初に、どの ユーザーが、どの送信元 IP アドレスから API オペレーションを呼び出したかを確認します。

  • 次の基本的な SQL クエリをテンプレートとして使用します。このクエリを Athena コンソールに貼り付けて実行します。

    SELECT useridentity.arn, eventname, sourceipaddress, eventtime FROM cloudtrail_logs LIMIT 100;
  • クエリを修正して、データをさらに詳しく調べます。

  • パフォーマンスを強化するには、LIMIT 句を追加して、指定したサブセットの行のみが返るようにします。