AWS CloudTrail ログのクエリ
AWS CloudTrail は、Amazon Web Services アカウントの AWS API コールとイベントを記録するサービスです。
CloudTrail ログには、コンソールを含めた AWS のサービス に対して発行された、あらゆる API コールの詳細が記載されます。CloudTrail は暗号化されたログファイルを生成して、それらを Amazon S3 に保存します。詳細については、AWS CloudTrailユーザーガイドを参照してください。
注記
アカウント、リージョン、および日付にわたって CloudTrail イベント情報に対して SQL クエリを実行する場合は、CloudTrail Lake の使用を検討してください。CloudTrail Lakeは、企業からの情報を単一の検索可能なイベントデータストアに集約するトレイルを作成する AWS の代替です。Amazon S3 バケットストレージを使用する代わりに、イベントはデータレイクに保存されるため、より豊富で高速なクエリが可能になります。これを使用して、組織、リージョン間、およびカスタム時間範囲内でイベントを検索する SQL クエリを作成できます。CloudTrail Lake クエリは CloudTrail コンソール自体で実行するため、CloudTrail Lake を使用すると Athena は必要ありません。詳細については、CloudTrail Lake のドキュメントを参照してください。
Athena での CloudTrail ログ の使用は、AWS のサービス のアクティビティに関する分析を強化する手段として優れています。たとえば、クエリを使用して傾向を識別したり、アクティビティを属性 (ソース IP アドレスやユーザーなど) でさらに分離したりできます。
一般的な用途は、セキュリティとコンプライアンスのための運用上のアクティビティの分析に CloudTrail ログを使用することです。詳細な例については、「AWS ビッグデータブログ」の記事「AWS CloudTrail と Amazon Athena を使用してセキュリティ、コンプライアンス、運用上のアクティビティを分析する方法
Athena は、ログファイルの LOCATION
を指定して、Amazon S3 からこれらのログファイルを直接クエリするために使用できます。以下の 2 つの方法のいずれかを実行できます。
-
CloudTrail コンソールから直接 CloudTrailログファイルのテーブルを作成する。
-
Athena コンソールで CloudTrail ログファイルのテーブルを手動で作成する。
トピック
CloudTrail ログと Athena テーブルについて
テーブルの作成を開始する前に、CloudTrail、そしてそれがどのようにデータを保存するかについての理解を深めておく必要があります。これは、テーブルを CloudTrail コンソールから作成するか Athena から作成するかにかかわらず、必要なテーブルを作成するために役立ちます。
CloudTrail は、ログを JSON テキストファイルとして、圧縮された gzip 形式 (*.json.gzip) に保存します。ログファイルの保存先は、証跡のセットアップ方法、AWS リージョン あるいはログを記録する対象のリージョン、その他の要因によって異なります。
ログの保存先、JSON 構造、およびレコードファイルの内容の詳細については、AWS CloudTrail ユーザーガイドの以下のトピックを参照してください。
ログを収集して Amazon S3 に保存し、AWS Management Console で CloudTrail を有効にします。詳細については、「AWS CloudTrail ユーザーガイド」の「証跡の作成」を参照してください。
ログの保存先の Amazon S3 バケットをメモしておきます。LOCATION
句を、CloudTrail ログの場所と使用するオブジェクトのセットへのパスに置き換えます。例では、特定のアカウントに関するログの LOCATION
値を使用していますが、目的に応じた保存先を指定できます。
以下に例を示します。
-
複数のアカウントのデータを分析するには、
LOCATION
を元の設定に戻し、AWSLogs
を使用してすべてのLOCATION 's3://MyLogFiles/AWSLogs/'
を分析対称にします。 -
特定の日付、アカウント、リージョンのデータを分析するには、
LOCATION 's3://MyLogFiles/123456789012/CloudTrail/us-east-1/2016/03/14/'.
を使用します。
オブジェクト階層の最上位レベルを使用することで、Athena を使用したクエリの実行時に最大の柔軟性が提供されます。
CloudTrail ログ用の Athena テーブルを作成するための CloudTrail コンソールの使用
CloudTrail コンソールから直接 CloudTrail ログをクエリするために、パーティションされていない Athena テーブルを作成できます。CloudTrail コンソールからの Athena テーブルの作成には、Athena でテーブルを作成するために十分なアクセス許可を持つロールでログインする必要があります。
注記
CloudTrail コンソールを使用して、組織の証跡ログ用の Athena テーブルを作成することはできません。代わりに、Athena コンソールを使用してテーブルを手動で作成し、正しいストレージ場所を指定できるようにします。組織の証跡については、AWS CloudTrail ユーザーガイドの「組織の証跡の作成」を参照してください。
-
Athena のためのアクセス許可のセットアップについては、「セットアップ」を参照してください。
-
パーティションを含むテーブルの作成については、「Athena で手動パーティショニングを使用して CloudTrail ログ用のテーブルを作成する」を参照してください。
CloudTrail コンソールを使用して CloudTrail 証跡用の Athena テーブルを作成する
CloudTrail コンソール (https://console.aws.amazon.com/cloudtrail/
) を開きます。 -
ナビゲーションペインで [Event history (イベント履歴)] を選択します。
-
[Athena テーブルを作成] をクリックします。
-
[Storage location] (ストレージの場所) の下矢印を使用して、クエリする証跡のためにログファイルが保存される Amazon S3 バケットを選択します。
注記
証跡に関連付けられているバケットの名前を探すには、CloudTrail のナビゲーションペインで [Trails] (証跡) を選択して、証跡の [S3 bucket] (S3 バケット) 列を表示します。バケットの Amazon S3 の場所を確認するには、[S3 bucket] (S3 バケット) 列でバケットのリンクを選択します。これにより、Amazon S3 コンソールで CloudTrail バケットの場所が開きます。
-
[Create table] (テーブルの作成) を選択します。Amazon S3 バケットの名前が含まれたデフォルトの名前でテーブルが作成されます。
Athena で手動パーティショニングを使用して CloudTrail ログ用のテーブルを作成する
Athena コンソールで、CloudTrail ログファイル用のテーブルを手動で作成してから、Athena でクエリを実行することができます。
Athena コンソールを使用して CloudTrail 証跡のための Athena テーブルを作成する
-
以下の DDL ステートメントを Athena コンソールクエリエディタにコピーして貼り付けます。
CREATE EXTERNAL TABLE cloudtrail_logs ( eventversion STRING, useridentity STRUCT< type:STRING, principalid:STRING, arn:STRING, accountid:STRING, invokedby:STRING, accesskeyid:STRING, userName:STRING, sessioncontext:STRUCT< attributes:STRUCT< mfaauthenticated:STRING, creationdate:STRING>, sessionissuer:STRUCT< type:STRING, principalId:STRING, arn:STRING, accountId:STRING, userName:STRING>, ec2RoleDelivery:string, webIdFederationData:map<string,string> > >, eventtime STRING, eventsource STRING, eventname STRING, awsregion STRING, sourceipaddress STRING, useragent STRING, errorcode STRING, errormessage STRING, requestparameters STRING, responseelements STRING, additionaleventdata STRING, requestid STRING, eventid STRING, resources ARRAY<STRUCT< arn:STRING, accountid:STRING, type:STRING>>, eventtype STRING, apiversion STRING, readonly STRING, recipientaccountid STRING, serviceeventdetails STRING, sharedeventid STRING, vpcendpointid STRING, tlsDetails struct< tlsVersion:string, cipherSuite:string, clientProvidedHostHeader:string> ) PARTITIONED BY (region string, year string, month string, day string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://
CloudTrail_bucket_name
/AWSLogs/Account_ID
/CloudTrail/'; -
(オプション) テーブルに必要のないフィールドをすべて削除します。特定の列のセットのみを読み込む必要がある場合は、テーブル定義で他の列を除外できます。
-
ログデータが含まれる保存先の Amazon S3 バケットをポイントするように
s3://
を変更します。CloudTrail_bucket_name
/AWSLogs/Account_ID/
CloudTrail/ -
フィールドが正しく表示されていることを確認します。CloudTrail レコード内のフィールドの完全なリストに関する詳細については、「CloudTrail レコードの内容」を参照してください。
次の例では Hive JSON SerDe を使用しています。この例では、フィールド
requestparameters
、responseelements
、およびadditionaleventdata
は、クエリでタイプSTRING
としてリストされていますが、JSON で使用されるSTRUCT
データ型です。そのため、これらのフィールドからデータを取得するには、JSON_EXTRACT
関数を使用します。詳細については、「JSON からのデータの抽出」を参照してください。パフォーマンスを向上させるために、この例では、AWS リージョン、年、月、日に基づいてデータをパーティションしています。 -
Athena コンソールで
CREATE TABLE
ステートメントを実行します。 -
以下の例のように、ALTER TABLE ADD PARTITION コマンドを使用してパーティションをロードすることで、データをクエリできるようにします。
ALTER TABLE
table_name
ADD PARTITION (region='us-east-1', year='2019', month='02', day='01') LOCATION 's3://cloudtrail_bucket_name
/AWSLogs/Account_ID
/CloudTrail/us-east-1/2019/02/01/
'
手動パーティショニングを使用して組織全体の証跡用のテーブルを作成する
Athena で組織全体の CloudTrail ログファイル用のテーブルを作成するには、「Athena で手動パーティショニングを使用して CloudTrail ログ用のテーブルを作成する」(Athena で手動パーティショニングを使用して CloudTrail ログ用のテーブルを作成する) の手順に従います。ただし、次の手順に記載されている変更を加えてください。
組織全体の CloudTrail ログ用の Athena テーブルを作成するには
-
次の例のように、
CREATE TABLE
ステートメントで、組織 ID が含まれるようにLOCATION
句を変更します。LOCATION 's3://
cloudtrail_bucket_name
/AWSLogs/organization_id
/Account_ID
/CloudTrail/' -
次の例のように、
PARTITIONED BY
句で、アカウント ID のエントリを文字列として追加します。PARTITIONED BY (account string, region string, year string, month string, day string)
次の例は、結合された結果を示しています。
... PARTITIONED BY (account string, region string, year string, month string, day string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://
cloudtrail_bucket_name
/AWSLogs/organization_id
/Account_ID
/CloudTrail/' -
次の例のように、
ALTER TABLE
ステートメントのADD PARTITION
句には、アカウント ID を含めます。ALTER TABLE table_name ADD PARTITION (account='
111122223333
', region='us-east-1', year='2022', month='08', day='08') -
次の例のように、
ALTER TABLE
ステートメントのLOCATION
句には、追加する組織 ID、アカウント ID、パーティションを含めます。LOCATION 's3://
cloudtrail_bucket_name
/AWSLogs/organization_id
/Account_ID
/CloudTrail/us-east-1/2022/08/08/'次の例では、
ALTER TABLE
ステートメントは結合された結果を示します。ALTER TABLE table_name ADD PARTITION (account='
111122223333
', region='us-east-1', year='2022', month='08', day='08') LOCATION 's3://cloudtrail_bucket_name
/AWSLogs/organization_id
/111122223333
/CloudTrail/us-east-1/2022/08/08/'
パーティション射影を使用した Athena での CloudTrail ログ用テーブルの作成
CloudTrail ログには事前に指定できるパーティションスキームを持つ既知の構造があるため、Athena のパーティション射影機能を使用することで、クエリランタイムを短縮し、パーティション管理を自動化できます。新しいデータが追加されると、パーティション射影は新しいパーティションを自動で追加します。このため、ALTER TABLE ADD PARTITION
を使用してパーティションを手動で追加する必要がなくなります。
次の CREATE TABLE
ステートメント例では、指定した日付から現在までの単一の AWS リージョン リージョンの CloudTrail ログで、パーティション射影を自動的に使用しています。LOCATION
句と storage.location.template
句では、bucket
、account-id
、および aws-region
の各プレースホルダーを、対応する同等の値に置き換えます。projection.timestamp.range
では、2020
/01
/01
を使用を開始する日に置き換えます。クエリが正常に実行されると、テーブルをクエリできます。パーティションをロードするのに、ALTER TABLE ADD PARTITION
を実行する必要はありません。
CREATE EXTERNAL TABLE cloudtrail_logs_pp( eventVersion STRING, userIdentity STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, invokedBy: STRING, accessKeyId: STRING, userName: STRING, sessionContext: STRUCT< attributes: STRUCT< mfaAuthenticated: STRING, creationDate: STRING>, sessionIssuer: STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, userName: STRING>, ec2RoleDelivery:string, webIdFederationData:map<string,string> > >, eventTime STRING, eventSource STRING, eventName STRING, awsRegion STRING, sourceIpAddress STRING, userAgent STRING, errorCode STRING, errorMessage STRING, requestparameters STRING, responseelements STRING, additionaleventdata STRING, requestId STRING, eventId STRING, readOnly STRING, resources ARRAY<STRUCT< arn: STRING, accountId: STRING, type: STRING>>, eventType STRING, apiVersion STRING, recipientAccountId STRING, serviceEventDetails STRING, sharedEventID STRING, vpcendpointid STRING, tlsDetails struct< tlsVersion:string, cipherSuite:string, clientProvidedHostHeader:string> ) PARTITIONED BY ( `timestamp` string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://
bucket
/AWSLogs/account-id
/CloudTrail/aws-region
' TBLPROPERTIES ( 'projection.enabled'='true', 'projection.timestamp.format'='yyyy/MM/dd', 'projection.timestamp.interval'='1', 'projection.timestamp.interval.unit'='DAYS', 'projection.timestamp.range'='2020
/01
/01
,NOW', 'projection.timestamp.type'='date', 'storage.location.template'='s3://bucket
/AWSLogs/account-id
/CloudTrail/aws-region
/${timestamp}')
パーティション射影の詳細については、「Amazon Athena でのパーティション射影」を参照してください。
ネストされたフィールドのクエリ
userIdentity
および resources
フィールドはネストされたデータ型であるため、それらのクエリには特別な処理が必要です。
userIdentity
オブジェクトは、ネストされた STRUCT
型で構成されています。以下の例にあるように、これらは、フィールドを区切るためにドットを使用してクエリすることができます。
SELECT eventsource, eventname, useridentity.sessioncontext.attributes.creationdate, useridentity.sessioncontext.sessionissuer.arn FROM cloudtrail_logs WHERE useridentity.sessioncontext.sessionissuer.arn IS NOT NULL ORDER BY eventsource, eventname LIMIT 10
resources
フィールドは STRUCT
オブジェクトの配列です。これらの配列では、CROSS JOIN UNNEST
を使用して配列のネストを解除し、そのオブジェクトをクエリできるようにします。
以下の例は、リソース ARN が example/datafile.txt
で終わるすべての行を返します。読みやすさのために、replacearn:aws:s3:::
サブストリングを削除します。
SELECT awsregion, replace(unnested.resources_entry.ARN,'arn:aws:s3:::') as s3_resource, eventname, eventtime, useragent FROM cloudtrail_logs t CROSS JOIN UNNEST(t.resources) unnested (resources_entry) WHERE unnested.resources_entry.ARN LIKE '%example/datafile.txt' ORDER BY eventtime
以下の例は、DeleteBucket
のイベントをクエリします。このクエリは、バケットの名前とバケットが属するアカウント ID を resources
オブジェクトから抽出します。
SELECT awsregion, replace(unnested.resources_entry.ARN,'arn:aws:s3:::') as deleted_bucket, eventtime AS time_deleted, useridentity.username, unnested.resources_entry.accountid as bucket_acct_id FROM cloudtrail_logs t CROSS JOIN UNNEST(t.resources) unnested (resources_entry) WHERE eventname = 'DeleteBucket' ORDER BY eventtime
ネスト解除の詳細については、「配列のフィルタ処理」を参照してください。
サンプルクエリ
以下の例は、CloudTrail イベントログ向けに作成されたテーブルから、匿名化された (署名されていない) すべてのリクエストを返すクエリの一部を示しています。このクエリは、useridentity.accountid
が匿名で useridentity.arn
が指定されていないリクエストを選択します。
SELECT * FROM
cloudtrail_logs
WHERE eventsource = 's3.amazonaws.com' AND eventname in ('GetObject') AND useridentity.accountid = 'anonymous' AND useridentity.arn IS NULL AND requestparameters LIKE '%[your bucket name ]%';
詳細については、「AWS Big·Data·Blog」(ビッグデータブログ) の記事「AWS CloudTrail と Amazon Athena を使用して、セキュリティ、コンプライアンス、運用上のアクティビティを分析する
CloudTrail ログのクエリに関するヒント
CloudTrail ログデータを探索するには、以下のヒントを参考にしてください。
-
ログをクエリする前に、ログテーブルが「Athena で手動パーティショニングを使用して CloudTrail ログ用のテーブルを作成する」に示している内容と同じになっていることを確認します。これが最初のテーブルではない場合は、コマンド
DROP TABLE cloudtrail_logs
を使用して既存のテーブルを削除します。 -
既存のテーブルを削除した後、再作成します。詳細については、「Athena で手動パーティショニングを使用して CloudTrail ログ用のテーブルを作成する」を参照してください。
Athena クエリのフィールドが正しく表示されていることを確認します。CloudTrail レコード内のフィールドの完全なリストについては、「CloudTrail レコードの内容」を参照してください。
クエリ内のフィールドが JSON 形式 (
STRUCT
など) である場合は、JSON からデータを抽出します。詳細については、「JSON からのデータの抽出」を参照してください。CloudTrail テーブルに対してクエリを発行するためのいくつかの提案事項:
-
最初に、どの ユーザーが、どの送信元 IP アドレスから API オペレーションを呼び出したかを確認します。
-
次の基本的な SQL クエリをテンプレートとして使用します。このクエリを Athena コンソールに貼り付けて実行します。
SELECT useridentity.arn, eventname, sourceipaddress, eventtime FROM cloudtrail_logs LIMIT 100;
-
クエリを修正して、データをさらに詳しく調べます。
-
パフォーマンスを強化するには、
LIMIT
句を追加して、指定したサブセットの行のみが返るようにします。