查詢 AWS CloudTrail 日誌 - Amazon Athena

查詢 AWS CloudTrail 日誌

AWS CloudTrail 是一項服務,可記錄 AWS API 呼叫以及 Amazon Web Services 帳戶的事件。

CloudTrail 日誌包含任何有關您的 AWS 服務 (包括主控台) 所做的 API 呼叫的詳細資訊。CloudTrail 會產生加密的日誌檔案,並將它們存放在 Amazon S3 中。如需詳細資訊,請參閱《AWS CloudTrail 使用者指南》https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-user-guide.html

注意

如果要跨帳戶、區域和日期對 CloudTrail 事件資訊執行 SQL 查詢,請考慮使用 CloudTrail Lake。CloudTrail Lake 是一種 AWS 替代方案,用於建立將企業的資訊彙總為單一可搜尋事件資料存放區的追蹤。其不使用 Amazon S3 儲存貯體儲存,而是將事件儲存在資料湖中,進而允許更豐富、更快的查詢。您可以使用它來建立 SQL 查詢,以便跨組織、區域並在自訂的時間範圍內搜尋事件。因為您在 CloudTrail 主控台本身中執行 CloudTrail Lake 查詢,因此使用 CloudTrail Lake 不需要 Athena。如需詳細資訊,請參閱 CloudTrail Lake 文件。

搭配使用 Athena 與 CloudTrail 日誌是一種功能強大的方式,可增強 AWS 服務 活動的分析。例如,您可以使用查詢來識別趨勢,並依屬性 (例如來源 IP 地址或使用者) 進一步隔離活動。

常見的應用是使用 CloudTrail 日誌來分析營運活動的安全性與合規性。如需詳細範例的資訊,請參閱 AWS 大數據部落格文章:使用 AWS CloudTrail 和 Amazon Athena 來分析安全性、合規性和營運活動

您可以使用 Athena,指定日誌檔案的 LOCATION,直接從 Amazon S3 查詢這些日誌檔案。有以下兩種做法:

  • 從 CloudTrail 主控台直接建立 CloudTrail 日誌檔案的資料表。

  • 在 Athena 主控台中手動建立 CloudTrail 日誌檔案的資料表。

了解 CloudTrail 日誌和 Athena 資料表

開始建立資料表之前,您應該更深入了解 CloudTrail 及其如何存放資料。這可協助您建立所需資料表,不論您的建立方式是透過 CloudTrail 主控台或 Athena。

CloudTrail 將日誌以壓縮的 gzip 格式 (*. json.gzip) 儲存在 JSON 文字檔案中。日誌檔案的位置取決於您如何設定追蹤、AWS 區域 或您正在登入的區域以及其他因素。

如需有關日誌存放位置、JSON 結構、日誌檔案內容的詳細資訊,請參閱《AWS CloudTrail 使用者指南》中的下列主題:

若要收集日誌並儲存到 Amazon S3 中,請從 AWS Management Console 啟用 CloudTrail。如需詳細資訊,請參閱《AWS CloudTrail 使用者指南》中的建立追蹤

請記下您儲存日誌的目的地 Amazon S3 儲存貯體。將 LOCATION 子句取代為 CloudTrail 日誌位置的路徑以及用於執行的物件組。範例中使用特定帳戶日誌的 LOCATION 值,但您可以使用適合您應用程式的具體程度。

例如:

  • 若要分析多個帳戶的資料,您可以復原 LOCATION 指標,使用 LOCATION 's3://MyLogFiles/AWSLogs/' 來指定所有 AWSLogs

  • 若要分析特定日期、帳戶和區域的資料,請使用 LOCATION 's3://MyLogFiles/123456789012/CloudTrail/us-east-1/2016/03/14/'.

當您使用 Athena 進行查詢時,使用物件階層的最高層級可給您最大的靈活性。

使用 CloudTrail 主控台為 CloudTrail 日誌建立 Athena 資料表

您可以建立非分割 Athena 資料表,以直接從 CloudTrail 主控台查詢 CloudTrail 日誌。從 CloudTrail 主控台建立 Athena 資料表時,您需要使用具有足夠許可的 IAM 使用者或角色登入,才能在 Athena 中建立資料表。

注意

您無法使用 CloudTrail 主控台建立組織線索日誌的 Athena 資料表。而是要使用 Athena 主控台手動建立資料表,以便指定正確的儲存位置。如需有關組織線索的資訊,請參閱《AWS CloudTrail 使用者指南》中的為組織建立追蹤

若要使用 CloudTrail 主控台為 CloudTrail 線索建立 Athena 資料表

  1. 透過 https://console.aws.amazon.com/cloudtrail/ 開啟 CloudTrail 主控台。

  2. 在導覽窗格中,選擇 Event history (事件歷史記錄)。

  3. 選擇 Create Athena table (建立 Athena 資料表)。

    
                        選擇 Create Athena table (建立 Athena 資料表)
  4. 對於 Storage location (儲存位置),請使用向下箭頭選取存放日誌檔案的 Amazon S3 儲存貯體,以供查詢線索。

    注意

    若要尋找與線索相關聯的儲存貯體名稱,請選擇 CloudTrail 導覽窗格中的 Trails (線索),然後檢視該線索的 S3 bucket (S3 儲存貯體) 資料欄。若要查看儲存貯體的 Amazon S3 位置,請在 S3 bucket (S3 儲存貯體) 資料欄中選擇儲存貯體的連結。這會開啟 Amazon S3 主控台,以前往 CloudTrail 儲存貯體位置。

  5. 選擇 Create Table (建立資料表)。建立的資料表會使用包含 Amazon S3 儲存貯體名稱的預設名稱。

使用手動分割在 Athena 中建立 CloudTrail 日誌的資料表

您可以在 Athena 主控台中手動建立 CloudTrail 日誌檔案的資料表,然後在 Athena 中執行查詢。

若要使用 Athena 主控台為 CloudTrail 線索建立 Athena 資料表

  1. 複製下列 DDL 陳述式,並將其貼到 Athena 主控台。該陳述式與 CloudTrail 主控台 Create a table in Amazon Athena (在 Amazon Athena 中建立資料表) 對話方塊中的陳述式相同,但新增一個 PARTITIONED BY 子句來分割資料表。

  2. 修改 s3://CloudTrail_bucket_name/AWSLogs/Account_ID/CloudTrail/ 來指向包含日誌資料的 Amazon S3 儲存貯體。

  3. 確認列出的欄位正確。如需有關 CloudTrail 記錄中欄位的完整清單的詳細資訊,請參閱 CloudTrail 記錄內容

    在這個範例中,欄位 requestparametersresponseelementsadditionaleventdata 在查詢中列為類型 STRING,但在 JSON 中使用時為 STRUCT 資料類型。因此,若要取得這些欄位的資料,請使用 JSON_EXTRACT 函數。如需詳細資訊,請參閱從 JSON 擷取資料。為了改善效能,本範例會依區域、年份、月份和日來分割資料。

    CREATE EXTERNAL TABLE cloudtrail_logs ( eventversion STRING, useridentity STRUCT< type:STRING, principalid:STRING, arn:STRING, accountid:STRING, invokedby:STRING, accesskeyid:STRING, userName:STRING, sessioncontext:STRUCT< attributes:STRUCT< mfaauthenticated:STRING, creationdate:STRING>, sessionissuer:STRUCT< type:STRING, principalId:STRING, arn:STRING, accountId:STRING, userName:STRING>>>, eventtime STRING, eventsource STRING, eventname STRING, awsregion STRING, sourceipaddress STRING, useragent STRING, errorcode STRING, errormessage STRING, requestparameters STRING, responseelements STRING, additionaleventdata STRING, requestid STRING, eventid STRING, resources ARRAY<STRUCT< ARN:STRING, accountId:STRING, type:STRING>>, eventtype STRING, apiversion STRING, readonly STRING, recipientaccountid STRING, serviceeventdetails STRING, sharedeventid STRING ) PARTITIONED BY (region string, year string, month string, day string) ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://CloudTrail_bucket_name/AWSLogs/Account_ID/CloudTrail/';
  4. 在 Athena 主控台中執行查詢。

  5. 使用 ALTER TABLE ADD PARTITION 命令載入分割區,以便進行查詢,如下列範例所示。

    ALTER TABLE table_name ADD PARTITION (region='us-east-1', year='2019', month='02', day='01') LOCATION 's3://CloudTrail_bucket_name/AWSLogs/Account_ID/CloudTrail/us-east-1/2019/02/01/'

使用分割區投影在 Athena 中建立 CloudTrail 日誌的資料表

由於 CloudTrail 日誌具有您可以事先指定其分割區配置的已知結構,您可以使用 Athena 分割區投影功能來減少查詢執行時間並自動化分割區管理。分割區投影會在新增資料時自動新增分割區。因此您無需使用 ALTER TABLE ADD PARTITION 手動新增分割區。

以下 CREATE TABLE 陳述式範例會從指定日期到現在,對單一 AWS 區域 的 CloudTrail 日誌自動使用分割區投影。在 LOCATIONstorage.location.template 子句中,請以對應的相同值取代儲存貯體account-idaws-region 預留位置。對於 projection.timestamp.range,請用您要使用的開始日期取代 2020/01/01。成功執行查詢之後,您可以查詢資料表。您無須執行 ALTER TABLE ADD PARTITION 就能載入分割區。

CREATE EXTERNAL TABLE cloudtrail_logs_pp( eventVersion STRING, userIdentity STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, invokedBy: STRING, accessKeyId: STRING, userName: STRING, sessionContext: STRUCT< attributes: STRUCT< mfaAuthenticated: STRING, creationDate: STRING>, sessionIssuer: STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, userName: STRING>>>, eventTime STRING, eventSource STRING, eventName STRING, awsRegion STRING, sourceIpAddress STRING, userAgent STRING, errorCode STRING, errorMessage STRING, requestParameters STRING, responseElements STRING, additionalEventData STRING, requestId STRING, eventId STRING, readOnly STRING, resources ARRAY<STRUCT< arn: STRING, accountId: STRING, type: STRING>>, eventType STRING, apiVersion STRING, recipientAccountId STRING, serviceEventDetails STRING, sharedEventID STRING, vpcendpointid STRING ) PARTITIONED BY ( `timestamp` string) ROW FORMAT SERDE 'com.amazon.emr.hive.serde.CloudTrailSerde' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://bucket/AWSLogs/account-id/CloudTrail/aws-region' TBLPROPERTIES ( 'projection.enabled'='true', 'projection.timestamp.format'='yyyy/MM/dd', 'projection.timestamp.interval'='1', 'projection.timestamp.interval.unit'='DAYS', 'projection.timestamp.range'='2020/01/01,NOW', 'projection.timestamp.type'='date', 'storage.location.template'='s3://bucket/AWSLogs/account-id/CloudTrail/aws-region/${timestamp}')

如需有關分割區投影的詳細資訊,請參閱使用 Amazon Athena 進行分割區投影

查詢巢狀欄位

由於 userIdentityresources 欄位是巢狀資料類型,查詢它們需要特殊的處理。

userIdentity 物件由巢狀 STRUCT 類型組成。如以下範例所示,可以使用點來分隔欄位,以查詢這些類型:

SELECT eventsource, eventname, useridentity.sessioncontext.attributes.creationdate, useridentity.sessioncontext.sessionissuer.arn FROM cloudtrail_logs WHERE useridentity.sessioncontext.sessionissuer.arn IS NOT NULL ORDER BY eventsource, eventname LIMIT 10

resources 欄位是 STRUCT 物件的陣列。請對這些陣列使用 CROSS JOIN UNNEST,解除陣列巢狀,以便於查詢其物件。

以下範例會傳回資源 ARN 以 example/datafile.txt 結尾的所有資料行。為了可讀性,replace 函數會從 ARN 移除初始 arn:aws:s3::: 子字串。

SELECT awsregion, replace(unnested.resources_entry.ARN,'arn:aws:s3:::') as s3_resource, eventname, eventtime, useragent FROM cloudtrail_logs t CROSS JOIN UNNEST(t.resources) unnested (resources_entry) WHERE unnested.resources_entry.ARN LIKE '%example/datafile.txt' ORDER BY eventtime

下列範例為 DeleteBucket 事件的查詢範例。該查詢會從 resources 物件擷取儲存貯體的名稱及儲存貯體隸屬的帳戶 ID。

SELECT awsregion, replace(unnested.resources_entry.ARN,'arn:aws:s3:::') as deleted_bucket, eventtime AS time_deleted, useridentity.username, unnested.resources_entry.accountid as bucket_acct_id FROM cloudtrail_logs t CROSS JOIN UNNEST(t.resources) unnested (resources_entry) WHERE eventname = 'DeleteBucket' ORDER BY eventtime

如需有關解除巢狀的詳細資訊,請參閱篩選陣列

查詢範例

以下範例顯示一個查詢的片段,該查詢從為 CloudTrail 事件日誌建立的資料表傳回所有匿名 (未簽署的) 請求。此查詢選擇 useridentity.accountid 是匿名且未指定 useridentity.arn 的那些請求:

SELECT * FROM cloudtrail_logs WHERE eventsource = 's3.amazonaws.com' AND eventname in ('GetObject') AND useridentity.accountid LIKE '%ANONYMOUS%' AND useridentity.arn IS NULL AND requestparameters LIKE '%[your bucket name ]%';

如需詳細資訊,請參閱 AWS 大數據部落格文章:使用 AWS CloudTrail 和 Amazon Athena 來分析安全性、合規性和營運活動

查詢 CloudTrail 日誌的秘訣

若要探索 CloudTrail 日誌資料,請使用以下秘訣:

  • 查詢日誌之前,確認您的日誌資料表看起來和 使用手動分割在 Athena 中建立 CloudTrail 日誌的資料表 中的相同。如果它不是第一個資料表,使用下列命令刪除現有的資料表:DROP TABLE cloudtrail_logs;

  • 捨棄現有資料表後,再重新建立。如需詳細資訊,請參閱 使用手動分割在 Athena 中建立 CloudTrail 日誌的資料表

    確認 Athena 查詢中列出的欄位正確。如需有關 CloudTrail 記錄中欄位的完整清單的資訊,請參閱 CloudTrail 記錄內容

    如果您的查詢包含 JSON 格式的欄位,例如 STRUCT,請從 JSON 擷取資料。如需詳細資訊,請參閱 從 JSON 擷取資料

    現在可以開始對 CloudTrail 資料表發出查詢。

  • 首先,查看哪些 IAM 使用者呼叫了哪些 API 操作,以及從哪些來源 IP 地址進行呼叫。

  • 以下列基本 SQL 查詢當做您的範本。將查詢貼到 Athena 主控台然後執行。

    SELECT useridentity.arn, eventname, sourceipaddress, eventtime FROM cloudtrail_logs LIMIT 100;
  • 修改舊的查詢,進一步探索您的資料。

  • 為了改善效能,加入 LIMIT 子句來傳回資料列的指定部分。