AWS CloudTrail ログのクエリ - Amazon Athena

AWS CloudTrail ログのクエリ

AWS CloudTrail は、Amazon Web Services アカウントの AWS API コールとイベントを記録するサービスです。

CloudTrail ログには、コンソールを含めた AWS のサービスに対して行われたあらゆる API コールの詳細が含まれています。CloudTrail は暗号化されたログファイルを生成して、それらを Amazon S3 に保存します。詳細については、AWS CloudTrail ユーザーガイドを参照してください。

CloudTrail ログでの Athena の使用は、AWS のサービスのアクティビティ分析を強化するための優れた手段です。たとえば、クエリを使用して傾向を識別したり、アクティビティを属性 (ソース IP アドレスやユーザーなど) でさらに分離したりできます。

一般的な用途は、セキュリティとコンプライアンスのための運用上のアクティビティの分析に CloudTrail ログを使用することです。詳細な例について詳しくは、AWS ビッグデータブログの記事「AWS CloudTrail と Amazon Athena を使用してセキュリティ、コンプライアンス、運用上のアクティビティを分析する方法」を参照してください。

Athena は、ログファイルの LOCATION を指定して、Amazon S3 からこれらのログファイルを直接クエリするために使用できます。以下の 2 つの方法のいずれかを実行できます。

  • CloudTrail コンソールから直接 CloudTrailログファイルのテーブルを作成する。

  • Athena コンソールで CloudTrail ログファイルのテーブルを手動で作成する。

CloudTrail ログと Athena テーブルについて

テーブルの作成を開始する前に、CloudTrail、そしてそれがどのようにデータを保存するかについての理解を深めておく必要があります。これは、テーブルを CloudTrail コンソールから作成するか Athena から作成するかにかかわらず、必要なテーブルを作成するために役立ちます。

CloudTrail は、ログを JSON テキストファイルとして、圧縮された gzip 形式 (*.json.gzip) に保存します。ログファイルの保存先は、証跡の設定方法、ログを記録する AWS リージョン、その他の要因によって異なります。

ログの保存先、JSON 構造、およびレコードファイルの内容の詳細については、AWS CloudTrail ユーザーガイドの以下のトピックを参照してください。

ログを収集して Amazon S3 に保存し、AWS Management Console で CloudTrail を有効にします。詳細については、AWS CloudTrail ユーザーガイドの「証跡の作成」を参照してください。

ログの保存先の Amazon S3 バケットをメモしておきます。LOCATION 句を、CloudTrail ログの場所と使用するオブジェクトのセットへのパスに置き換えます。この例では、特定のアカウントに関するログの LOCATION 値を使用していますが、目的に応じた保存先を指定できます。

以下に例を示します。

  • 複数のアカウントのデータを分析するには、LOCATION を元の設定に戻し、AWSLogs を使用してすべての LOCATION 's3://MyLogFiles/AWSLogs/' を分析対称にします。

  • 特定の日付、アカウント、リージョンのデータを分析するには、LOCATION 's3://MyLogFiles/123456789012/CloudTrail/us-east-1/2016/03/14/'. を使用します。

オブジェクト階層の最上位レベルを使用することで、Athena を使用したクエリの実行時に最大の柔軟性が提供されます。

CloudTrail ログ用の Athena テーブルを作成するための CloudTrail コンソールの使用

CloudTrail コンソールから直接 CloudTrail ログをクエリするために、パーティション分割されていない Athena テーブルを作成できます。CloudTrail コンソールからの Athena テーブルの作成には、Athena でテーブルを作成するために十分なアクセス許可を持つ IAM ユーザーまたはロールでログインする必要があります。

注記

CloudTrail コンソールを使用して、組織の証跡ログ用の Athena テーブルを作成することはできません。代わりに、Athena コンソールを使用してテーブルを手動で作成し、正しいストレージ場所を指定できるようにします。組織の証跡については、AWS CloudTrail ユーザーガイドの「組織の証跡の作成」を参照してください。

CloudTrail コンソールを使用して CloudTrail 証跡用の Athena テーブルを作成する

  1. https://console.aws.amazon.com/cloudtrail/ で CloudTrail コンソールを開きます。

  2. ナビゲーションペインで [Event history] (イベント履歴) をクリックします。

  3. [Athena テーブルを作成] をクリックします。

    
                        [Create Athena table] (Athena テーブルを作成) をクリックします。
  4. [Storage location] (ストレージの場所) の下矢印を使用して、クエリする証跡のためにログファイルが保存される Amazon S3 バケットを選択します。

    注記

    証跡に関連付けられているバケットの名前を探すには、CloudTrail のナビゲーションペインで [Trails] (証跡) を選択して、証跡の [S3 bucket] (S3 バケット) 列を表示します。バケットの Amazon S3 の場所を確認するには、[S3 bucket] (S3 バケット) 列でバケットのリンクを選択します。これにより、Amazon S3 コンソールで CloudTrail バケットの場所が開きます。

  5. [Create table] (テーブルの作成) をクリックします。Amazon S3 バケットの名前が含まれたデフォルトの名前でテーブルが作成されます。

手動パーティション分割を使用した Athena での CloudTrail ログ用テーブルの作成

Athena コンソールで、CloudTrail ログファイル用のテーブルを手動で作成してから、Athena でクエリを実行することができます。

Athena コンソールを使用して CloudTrail 証跡のための Athena テーブルを作成する

  1. 以下の DDL ステートメントをコピーして Athena コンソールに貼り付けます。このステートメントは、CloudTrail コンソールの [Create a table in Amazon Athena] (Amazon Athena にテーブルを作成) ダイアログボックスにあるものと同じですが、テーブルをパーティション分割する PARTITIONED BY 句を追加します。

  2. ログデータが含まれる保存先の Amazon S3 バケットをポイントするように s3://CloudTrail_bucket_name/AWSLogs/Account_ID/CloudTrail/ を変更します。

  3. フィールドが正しく表示されていることを確認します。CloudTrail レコード内のフィールドの完全なリストに関する詳細については、「CloudTrail レコードの内容」を参照してください。

    この例では、フィールド requestparametersresponseelements、および additionaleventdata は、クエリでタイプ STRING としてリストされていますが、JSON で使用される STRUCT データ型です。そのため、これらのフィールドからデータを取得するには、JSON_EXTRACT 関数を使用します。詳細については、「」を参照してくださいJSON からのデータの抽出 パフォーマンスを向上させるために、この例ではリージョン、年、月、日に基づいてデータをパーティション分割しています。

    CREATE EXTERNAL TABLE cloudtrail_logs ( eventversion STRING, useridentity STRUCT< type:STRING, principalid:STRING, arn:STRING, accountid:STRING, invokedby:STRING, accesskeyid:STRING, userName:STRING, sessioncontext:STRUCT< attributes:STRUCT< mfaauthenticated:STRING, creationdate:STRING>, sessionissuer:STRUCT< type:STRING, principalId:STRING, arn:STRING, accountId:STRING, userName:STRING>>>, eventtime STRING, eventsource STRING, eventname STRING, awsregion STRING, sourceipaddress STRING, useragent STRING, errorcode STRING, errormessage STRING, requestparameters STRING, responseelements STRING, additionaleventdata STRING, requestid STRING, eventid STRING, resources ARRAY<STRUCT< ARN:STRING, accountId:STRING, type:STRING>>, eventtype STRING, apiversion STRING, readonly STRING, recipientaccountid STRING, serviceeventdetails STRING, sharedeventid STRING, vpcendpointid STRING ) PARTITIONED BY (region string, year string, month string, day string) ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://CloudTrail_bucket_name/AWSLogs/Account_ID/CloudTrail/';
  4. Athena コンソールでクエリを実行します。

  5. 以下の例のように、ALTER TABLE ADD PARTITION コマンドを使用してパーティションをロードすることで、データをクエリできるようにします。

    ALTER TABLE table_name ADD PARTITION (region='us-east-1', year='2019', month='02', day='01') LOCATION 's3://CloudTrail_bucket_name/AWSLogs/Account_ID/CloudTrail/us-east-1/2019/02/01/'

パーティション射影を使用した Athena での CloudTrail ログ用テーブルの作成

CloudTrail ログには事前に指定できるパーティションスキームを持つ既知の構造があるため、Athena のパーティション射影機能を使用することで、クエリの実行時間を短縮し、パーティション管理を自動化できます。新しいデータが追加されると、パーティション投影は新しいパーティションを自動で追加します。このため、ALTER TABLE ADD PARTITION を使用してパーティションを手動で追加する必要がなくなります。

次の CREATE TABLE ステートメント例では、指定した日付から現在までの単一の AWS リージョン リージョンの CloudTrail ログで、パーティション射影を自動的に使用しています。LOCATION 句と storage.location.template 句では、bucketaccount-id、および aws-region の各プレースホルダーを、対応する同一の値に置き換えます。projection.timestamp.range では、2020/01/01 を、使用する開始日に置き換えます。クエリが正常に実行されると、テーブルをクエリできます。パーティションをロードするのに、ALTER TABLE ADD PARTITION を実行する必要はありません。

CREATE EXTERNAL TABLE cloudtrail_logs_pp( eventVersion STRING, userIdentity STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, invokedBy: STRING, accessKeyId: STRING, userName: STRING, sessionContext: STRUCT< attributes: STRUCT< mfaAuthenticated: STRING, creationDate: STRING>, sessionIssuer: STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, userName: STRING>>>, eventTime STRING, eventSource STRING, eventName STRING, awsRegion STRING, sourceIpAddress STRING, userAgent STRING, errorCode STRING, errorMessage STRING, requestParameters STRING, responseElements STRING, additionalEventData STRING, requestId STRING, eventId STRING, readOnly STRING, resources ARRAY<STRUCT< arn: STRING, accountId: STRING, type: STRING>>, eventType STRING, apiVersion STRING, recipientAccountId STRING, serviceEventDetails STRING, sharedEventID STRING, vpcEndpointId STRING ) PARTITIONED BY ( `timestamp` string) ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://bucket/AWSLogs/account-id/CloudTrail/aws-region' TBLPROPERTIES ( 'projection.enabled'='true', 'projection.timestamp.format'='yyyy/MM/dd', 'projection.timestamp.interval'='1', 'projection.timestamp.interval.unit'='DAYS', 'projection.timestamp.range'='2020/01/01,NOW', 'projection.timestamp.type'='date', 'storage.location.template'='s3://bucket/AWSLogs/account-id/CloudTrail/aws-region/${timestamp}')

パーティション射影の詳細については、「Amazon Athena でのパーティション射影」を参照してください。

ネストされたフィールドのクエリ

userIdentity および resources フィールドはネストされたデータ型であるため、それらのクエリには特別な処理が必要です。

userIdentity オブジェクトは、ネストされた STRUCT 型で構成されています。以下の例にあるように、これらは、フィールドを区切るためにドットを使用してクエリすることができます。

SELECT eventsource, eventname, useridentity.sessioncontext.attributes.creationdate, useridentity.sessioncontext.sessionissuer.arn FROM cloudtrail_logs WHERE useridentity.sessioncontext.sessionissuer.arn IS NOT NULL ORDER BY eventsource, eventname LIMIT 10

resources フィールドは STRUCT オブジェクトの配列です。これらの配列では、CROSS JOIN UNNEST を使用して配列のネストを解除し、そのオブジェクトをクエリできるようにします。

以下の例は、リソース ARN が example/datafile.txt で終わるすべての行を返します。読みやすさのために、replace 関数が ARN から初期の arn:aws:s3::: サブストリングを削除します。

SELECT awsregion, replace(unnested.resources_entry.ARN,'arn:aws:s3:::') as s3_resource, eventname, eventtime, useragent FROM cloudtrail_logs t CROSS JOIN UNNEST(t.resources) unnested (resources_entry) WHERE unnested.resources_entry.ARN LIKE '%example/datafile.txt' ORDER BY eventtime

以下の例は、DeleteBucket のイベントをクエリします。このクエリは、バケットの名前とバケットが属するアカウント ID を resources オブジェクトから抽出します。

SELECT awsregion, replace(unnested.resources_entry.ARN,'arn:aws:s3:::') as deleted_bucket, eventtime AS time_deleted, useridentity.username, unnested.resources_entry.accountid as bucket_acct_id FROM cloudtrail_logs t CROSS JOIN UNNEST(t.resources) unnested (resources_entry) WHERE eventname = 'DeleteBucket' ORDER BY eventtime

ネスト解除の詳細については、「配列のフィルタ処理」を参照してください。

クエリの例

以下の例は、CloudTrail イベントログ向けに作成されたテーブルから、匿名化された (署名されていない) すべてのリクエストを返すクエリの一部を示しています。このクエリは、useridentity.accountid が匿名で useridentity.arn が指定されていないリクエストを選択します。

SELECT * FROM cloudtrail_logs WHERE eventsource = 's3.amazonaws.com' AND eventname in ('GetObject') AND useridentity.accountid LIKE '%ANONYMOUS%' AND useridentity.arn IS NULL AND requestparameters LIKE '%[your bucket name ]%';

詳細については、AWS ビッグデータブログの記事「AWS CloudTrail と Amazon Athena を使用して、セキュリティ、コンプライアンス、運用上のアクティビティを分析する」を参照してください。

CloudTrail ログのクエリに関するヒント

CloudTrail ログデータを探索するには、以下のヒントを参考にしてください。

  • ログをクエリする前に、ログテーブルが「手動パーティション分割を使用した Athena での CloudTrail ログ用テーブルの作成」に示している内容と同じになっていることを確認します。これが最初のテーブルではない場合は、コマンド DROP TABLE cloudtrail_logs; を使用して既存のテーブルを削除します。

  • 既存のテーブルを削除した後、再作成します。詳細については、「CloudTrail ログのテーブルの作成」を参照してください。

    Athena クエリのフィールドが正しく表示されていることを確認します。CloudTrail レコード内のフィールドの完全なリストについては、「CloudTrail レコードの内容」を参照してください。

    クエリ内のフィールドが JSON 形式 (STRUCT など) である場合は、JSON からデータを抽出します。詳細については、「JSON からのデータの抽出」を参照してください。

    これで CloudTrail テーブルに対してクエリを発行する準備ができました。

  • 最初に、どの IAM ユーザーが、どの送信元 IP アドレスから API オペレーションを呼び出したかを確認します。

  • 次の基本的な SQL クエリをテンプレートとして使用します。このクエリを Athena コンソールに貼り付けて実行します。

    SELECT useridentity.arn, eventname, sourceipaddress, eventtime FROM cloudtrail_logs LIMIT 100;
  • さらに、前述のクエリを変更してデータを探索します。

  • パフォーマンスを強化するには、LIMIT 句を追加して、指定したサブセットの行のみが返るようにします。