查詢 AWS WAF 記錄檔 - Amazon Athena

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

查詢 AWS WAF 記錄檔

AWS WAF 是一種 Web 應用程式防火牆,可讓您監視和控制受保護的 Web 應用程式從用戶端接收的 HTTP 和 HTTPS 要求。您可以透過在 Web 存取控制清單 (ACL) 內設定規則來定義如何處理 AWS WAF Web 要求。然後,您可以透過將 Web ACL 與 Web 應用程式建立關聯來保護 Web 應用程式。您可以使用保護的 Web 應用程式資源範例包 AWS WAF 括 Amazon CloudFront 分發、Amazon API Gateway REST API 和應用程式負載平衡器。如需有關的詳細資訊 AWS WAF,請參閱AWS WAFAWS WAF 發人員指南中的。

AWS WAF 記錄檔包括 Web ACL 分析的流量相關資訊,例如從您的 AWS 資源 AWS WAF 接收要求的時間、有關請求的詳細資訊,以及每個要求符合之規則的動作。

您可以設定 AWS WAF Web ACL,將記錄檔發佈到數個目的地之一,您可以在其中查詢和檢視這些目的地。如需設定 Web ACL 記錄和記錄 AWS WAF 檔內容的詳細資訊,請參閱AWS WAF 開發人員指南中的記錄 AWS WAF Web ACL 流量

如需如何將 AWS WAF 日誌彙總到中央資料湖儲存庫並使用 Athena 進行查詢的範例,請參閱 AWS 大數據部落格文章:使用 OpenSearch 服務、Amazon Athena 和 Amazon 分析 AWS WAF 日誌 QuickSight

本主題提供兩個範例 CREATE TABLE 陳述式:一個使用分割區,另一個不使用。

注意

所以本主題中的 CREATE TABLE 陳述式可同時用於 v1 和 v2 AWS WAF 日誌。在 v1 中,webaclid 欄位含有一個 ID。在 v2 中,webaclid 欄位含有一個完整的 ARN。此 CREATE TABLE 陳述式藉由使用 string 資料類型,以不可知的方式處理此內容。

使用分割區投影在 Athena 中建立 AWS WAF S3 日誌的資料表

因為 AWS WAF 記錄檔具有已知的結構,您可以事先指定其資料分割配置,因此您可以使用 Athena 分割區投影功能,縮短查詢執行階段並自動化分割區管理。分割區投影會在新增資料時自動新增分割區。因此您無需使用 ALTER TABLE ADD PARTITION 手動新增分割區。

下列範例CREATE TABLE陳述式會自動在指定日期的 AWS WAF 記錄上使用分割區投影,直到四個不同 AWS 區域為止。本範例中的 PARTITION BY 子句按區域和日期進行分割,但您可以根據自己的要求進行修改。視需要修改欄位,以符合您的日誌輸出。在LOCATIONstorage.location.template子句中,將儲存體和 accountID 預留位置取代為識別日誌之 Amazon S3 儲存貯體位置的 AWS WAF 值。對於 projection.day.range,請用您要使用的開始日期取代 2021/01/01。成功執行查詢之後,您可以查詢資料表。您無須執行 ALTER TABLE ADD PARTITION 就能載入分割區。

CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array < struct < rulegroupid: string, terminatingrule: struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > >, nonterminatingmatchingrules: array < struct < ruleid: string, action: string, overriddenaction: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, challengeresponse: struct < responsecode: string, solvetimestamp: string >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, excludedrules: string > >, `ratebasedrulelist` array < struct < ratebasedruleid: string, limitkey: string, maxrateallowed: int > >, `nonterminatingmatchingrules` array < struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, challengeresponse: struct < responsecode: string, solvetimestamp: string >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, `requestheadersinserted` array < struct < name: string, value: string > >, `responsecodesent` string, `httprequest` struct < clientip: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpversion: string, httpmethod: string, requestid: string >, `labels` array < struct < name: string > >, `captcharesponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `challengeresponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `ja3Fingerprint` string ) PARTITIONED BY ( `region` string, `date` string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/AWSLogs/accountID/WAFLogs/region/DOC-EXAMPLE-WEBACL/' TBLPROPERTIES( 'projection.enabled' = 'true', 'projection.region.type' = 'enum', 'projection.region.values' = 'us-east-1,us-west-2,eu-central-1,eu-west-1', 'projection.date.type' = 'date', 'projection.date.range' = '2021/01/01,NOW', 'projection.date.format' = 'yyyy/MM/dd', 'projection.date.interval' = '1', 'projection.date.interval.unit' = 'DAYS', 'storage.location.template' = 's3://DOC-EXAMPLE-BUCKET/AWSLogs/accountID/WAFLogs/${region}/DOC-EXAMPLE-WEBACL/${date}/')
注意

範例中LOCATION子句中的路徑格式為標準格式,但可能會根據您實作的 AWS WAF 組態而有所不同。例如,下列範例 AWS WAF 記錄檔路徑適用於 CloudFront 散發:

s3://DOC-EXAMPLE-BUCKET/AWSLogs/12345678910/WAFLogs/cloudfront/cloudfronyt/2022/08/08/17/55/

如果您在建立或查詢 AWS WAF 記錄資料表時遇到問題,請確認記錄資料或連絡人的位置 AWS Support。

如需有關分割區投影的詳細資訊,請參閱使用 Amazon Athena 進行分割區投影

為不分割的 AWS WAF 記錄建立資料表

本節說明如何在不進行資料分割或分割區投影的情況下為 AWS WAF 記錄建立資料表。

注意

出於性能和成本原因,我們不建議使用非分區的結構描述進行查詢。如需詳細資訊,請參閱大數據部落格中的 Amazon Athena 十 AWS 大效能調整秘訣

建立 AWS WAF 表格的步驟

  1. 複製下列 DDL 陳述式,並將其貼到 Athena 主控台。視需要修改欄位,以符合您的日誌輸出。修改 Amazon S3 儲存貯體的 LOCATION 以與存放日誌的儲存貯體相對應。

    此查詢會使用 OpenX JSON SerDe

    注意

    SerDe 預期每個 JSON 文件都位於單行文字上,且記錄中欄位之間沒有行終止字元。如果 JSON 文本是漂亮的打印格式,您可能會收到一條錯誤消息,如 HIVE_CURSOR_ERROR:行不是有效的 JSON 對象HIVE_CURSOR_ERROR:: 意外 JsonParseException end-of-input:當您嘗試在創建表後查詢對象的預期關閉標記。如需詳細資訊,請參閱上 GitHub的 OpenX SerDe 文件中的 JSON 資料檔案。

    CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array < struct < rulegroupid: string, terminatingrule: struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > >, nonterminatingmatchingrules: array < struct < ruleid: string, action: string, overriddenaction: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, challengeresponse: struct < responsecode: string, solvetimestamp: string >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, excludedrules: string > >, `ratebasedrulelist` array < struct < ratebasedruleid: string, limitkey: string, maxrateallowed: int > >, `nonterminatingmatchingrules` array < struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, challengeresponse: struct < responsecode: string, solvetimestamp: string >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, `requestheadersinserted` array < struct < name: string, value: string > >, `responsecodesent` string, `httprequest` struct < clientip: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpversion: string, httpmethod: string, requestid: string >, `labels` array < struct < name: string > >, `captcharesponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `challengeresponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `ja3Fingerprint` string ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/'
  2. 在 Athena 主控台查詢編輯器中執行 CREATE EXTERNAL TABLE 陳述式。這會註冊 waf_logs 資料表,並讓其中的資料可用於從 Athena 進行查詢。

AWS WAF 記錄檔查詢範例

以下許多範例查詢使用本文件中先前所建立的分割區投影資料表。請根據您的需求修改範例中的資料表名稱、資料欄值及其他變數。若要改善查詢的效能並降低成本,請在篩選條件中新增分割區資料欄。

 

 

 

範例
– 計算包含指定字詞的 Referrer 數量

以下查詢會計算在指定的日期範圍內包含 "amazon" 一詞的 Referrer 數量。

WITH test_dataset AS (SELECT header FROM waf_logs CROSS JOIN UNNEST(httprequest.headers) AS t(header) WHERE "date" >= '2021/03/01' AND "date" < '2021/03/31') SELECT COUNT(*) referer_count FROM test_dataset WHERE LOWER(header.name)='referer' AND header.value LIKE '%amazon%'
範例
– 計算過去 10 天內符合排除規則的所有相符 IP 地址

以下查詢會計算過去 10 天內 IP 地址符合規則群組中排除規則的次數。

WITH test_dataset AS (SELECT * FROM waf_logs CROSS JOIN UNNEST(rulegrouplist) AS t(allrulegroups)) SELECT COUNT(*) AS count, "httprequest"."clientip", "allrulegroups"."excludedrules", "allrulegroups"."ruleGroupId" FROM test_dataset WHERE allrulegroups.excludedrules IS NOT NULL AND from_unixtime(timestamp/1000) > now() - interval '10' day GROUP BY "httprequest"."clientip", "allrulegroups"."ruleGroupId", "allrulegroups"."excludedrules" ORDER BY count DESC
範例
– 依相符的次數對所有計數的受管規則進行分組

如果您在 2022 年 10 月 27 日之前在 Web ACL 組態中將規則群組規則動作設定為「計數」,則在 Web ACL JSON 中將覆寫 AWS WAF 儲存為excludedRules。現在,將規則覆寫為計數的 JSON 設定位於 ruleActionOverrides 設定中。如需詳細資訊,請參閱《AWS WAF 開發人員指南》中的規則群組中的動作覆寫。若要從新的日誌結構擷取「計數」模式下的受管規則,請查詢 ruleGroupList 區段中的 nonTerminatingMatchingRules 而非 excludedRules 欄位,如下列範例所示。

SELECT count(*) AS count, httpsourceid, httprequest.clientip, t.rulegroupid, t.nonTerminatingMatchingRules FROM "waf_logs" CROSS JOIN UNNEST(rulegrouplist) AS t(t) WHERE action <> 'BLOCK' AND cardinality(t.nonTerminatingMatchingRules) > 0 GROUP BY t.nonTerminatingMatchingRules, action, httpsourceid, httprequest.clientip, t.rulegroupid ORDER BY "count" DESC Limit 50
範例
– 依相符的次數對所有計數的自訂規則進行分組

下列查詢會依符合的次數,將所有計數的自訂規則分組。

SELECT count(*) AS count, httpsourceid, httprequest.clientip, t.ruleid, t.action FROM "waf_logs" CROSS JOIN UNNEST(nonterminatingmatchingrules) AS t(t) WHERE action <> 'BLOCK' AND cardinality(nonTerminatingMatchingRules) > 0 GROUP BY t.ruleid, t.action, httpsourceid, httprequest.clientip ORDER BY "count" DESC Limit 50

如需有關自訂規則和受管規則群組的日誌位置的資訊,請參閱《AWS WAF 開發人員指南》中的監控和調校

使用日期和時間

範例
– 以人類看得懂的 ISO 8601 格式傳回時間戳記欄位

以下查詢會使用 from_unixtimeto_iso8601 函數,以人類看得懂的 ISO 8601 格式傳回 timestamp 欄位 (例如 2019-12-13T23:40:12.000Z,而非 1576280412771) 查詢也會傳回 HTTP 來源名稱、來源 ID 和請求。

SELECT to_iso8601(from_unixtime(timestamp / 1000)) as time_ISO_8601, httpsourcename, httpsourceid, httprequest FROM waf_logs LIMIT 10;
範例
– 傳回過去 24 小時的記錄

以下查詢會使用 WHERE 子句中的篩選條件,傳回過去 24 小時記錄的 HTTP 來源名稱、HTTP 來源 ID 和 HTTP 請求欄位。

SELECT to_iso8601(from_unixtime(timestamp/1000)) AS time_ISO_8601, httpsourcename, httpsourceid, httprequest FROM waf_logs WHERE from_unixtime(timestamp/1000) > now() - interval '1' day LIMIT 10;
範例
– 傳回指定日期範圍和 IP 地址的記錄

以下查詢會列出指定用戶端 IP 地址的指定日期範圍內的記錄。

SELECT * FROM waf_logs WHERE httprequest.clientip='53.21.198.66' AND "date" >= '2021/03/01' AND "date" < '2021/03/31'
範例
– 如果是指定的日期範圍,則會計算每隔五分鐘的 IP 地址數量

以下查詢會針對特定日期範圍,計算每隔五分鐘的 IP 地址數量。

WITH test_dataset AS (SELECT format_datetime(from_unixtime((timestamp/1000) - ((minute(from_unixtime(timestamp / 1000))%5) * 60)),'yyyy-MM-dd HH:mm') AS five_minutes_ts, "httprequest"."clientip" FROM waf_logs WHERE "date" >= '2021/03/01' AND "date" < '2021/03/31') SELECT five_minutes_ts,"clientip",count(*) ip_count FROM test_dataset GROUP BY five_minutes_ts,"clientip"
範例
– 計算過去 10 天內的 X-Forwarded-For IP 數量

以下查詢會篩選請求標頭,並計算過去 10 天內的 X-Forwarded-For IP 數量。

WITH test_dataset AS (SELECT header FROM waf_logs CROSS JOIN UNNEST (httprequest.headers) AS t(header) WHERE from_unixtime("timestamp"/1000) > now() - interval '10' DAY) SELECT header.value AS ip, count(*) AS COUNT FROM test_dataset WHERE header.name='X-Forwarded-For' GROUP BY header.value ORDER BY COUNT DESC

如需有關日期和時間函數的詳細資訊,請參閱 Trino 文件中的 Date and time functions and operators (日期和時間函數和運算子)。

使用遭封鎖的請求和地址

範例
– 擷取遭到指定規則類型封鎖的前 100 個 IP 地址

以下查詢會擷取並計算在指定日期範圍內,已遭到 RATE_BASED 終止規則封鎖的前 100 個 IP 地址。

SELECT COUNT(httpRequest.clientIp) as count, httpRequest.clientIp FROM waf_logs WHERE terminatingruletype='RATE_BASED' AND action='BLOCK' and "date" >= '2021/03/01' AND "date" < '2021/03/31' GROUP BY httpRequest.clientIp ORDER BY count DESC LIMIT 100
範例
– 計算來自指定國家/地區遭到封鎖的請求次數

以下查詢會計算來自愛爾蘭 (IE) 的 IP 地址,並遭 RATE_BASED 終止規則封鎖的請求次數。

SELECT COUNT(httpRequest.country) as count, httpRequest.country FROM waf_logs WHERE terminatingruletype='RATE_BASED' AND httpRequest.country='IE' GROUP BY httpRequest.country ORDER BY count LIMIT 100;
範例
– 計算遭封鎖的請求次數 (依特定屬性分組)

下列查詢會計算要求遭到封鎖的次數,其結果會依 WebACL、 RuleId、用 ClientIP 和 HTTP 要求 URI 分組。

SELECT COUNT(*) AS count, webaclid, terminatingruleid, httprequest.clientip, httprequest.uri FROM waf_logs WHERE action='BLOCK' GROUP BY webaclid, terminatingruleid, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;
範例
– 計算與特定終止規則 ID 相符的次數。

以下查詢會計算與特定終止規則 ID (WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e') 相符的次數。查詢接著會以 WebACL、Action、ClientIP 和 HTTP Request URI 將結果分組。

SELECT COUNT(*) AS count, webaclid, action, httprequest.clientip, httprequest.uri FROM waf_logs WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e' GROUP BY webaclid, action, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;
範例
– 擷取指定日期範圍內遭到封鎖的前 100 個 IP 地址

以下查詢會擷取在指定日期範圍內,已遭到封鎖的前 100 個 IP 地址。該查詢也會列出 IP 地址遭到封鎖的次數。

SELECT "httprequest"."clientip", "count"(*) "ipcount", "httprequest"."country" FROM waf_logs WHERE "action" = 'BLOCK' and "date" >= '2021/03/01' AND "date" < '2021/03/31' GROUP BY "httprequest"."clientip", "httprequest"."country" ORDER BY "ipcount" DESC limit 100