AWS WAF ログのクエリ - Amazon Athena

AWS WAF ログのクエリ

AWS WAF ログには、AWS WAF が AWS リソースからリクエストを受け取った時間、そのリクエストの詳細、および各リクエストが一致したルールのアクションなど、ウェブ ACL によって分析されたトラフィックに関する情報が含まれます。

AWS WAF ログのアクセスロギングを有効にして、それらを Amazon S3 に保存することができます。これらのログを保存する Amazon S3 バケットをメモしておくと、それらの Athena テーブルを作成して、Athena でクエリすることができます。

AWS WAF ログの有効化とログレコード構造の詳細については、AWS WAF 開発者ガイドの「Logging web ACL traffic information」を参照してください。

AWS WAF ログを中央データレイクリポジトリに集約して、それらを Athena でクエリする方法の例については、AWS ビッグデータブログ記事、「Amazon ES、Amazon Athena、および Amazon QuickSight を使用して AWS WAF ログを分析する」を参照してください。

AWS WAF ログ用のテーブルの作成

AWS WAF テーブルを作成する

  1. 以下の DDL ステートメントをコピーして Athena コンソール内に貼り付けます。LOCATION をログを保存する S3 バケットに変更します。

    このクエリでは、OpenX JSON SerDe を使用します。テーブル形式と SerDe は、AWS Glue クローラが AWS WAF ログを分析するときにクローラによって提案されます。

    注記

    SerDe は、Amazon S3 内の WAF ログインにある各 JSON レコードが、レコード内のフィールドを区切る行終端文字がない 1 行のテキストに存在することを期待します。WAF ログ JSON テキストがプリティ印刷形式の場合、テーブルの作成後にクエリを実行しようとすると、エラーメッセージ HIVE_CURSOR_ERROR: 行は有効な JSON オブジェクトではありませんが表示されます。

    CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array< struct< conditiontype:string, location:string, matcheddata:array<string> > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array< struct< rulegroupid:string, terminatingrule:struct< ruleid:string, action:string, rulematchdetails:string >, nonterminatingmatchingrules:array< struct< ruleid:string, action:string, rulematchdetails:array< struct< conditiontype:string, location:string, matcheddata:array<string> > > > >, excludedrules:array< struct< ruleid:string, exclusiontype:string > > > >, `ratebasedrulelist` array< struct< ratebasedruleid:string, limitkey:string, maxrateallowed:int > >, `nonterminatingmatchingrules` array< struct< ruleid:string, action:string > >, `requestheadersinserted` string, `responsecodesent` string, `httprequest` struct< clientip:string, country:string, headers:array< struct< name:string, value:string > >, uri:string, args:string, httpversion:string, httpmethod:string, requestid:string >, `labels` array< struct< name:string > > ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( 'paths'='action,formatVersion,httpRequest,httpSourceId,httpSourceName,labels,nonTerminatingMatchingRules,rateBasedRuleList,requestHeadersInserted,responseCodeSent,ruleGroupList,terminatingRuleId,terminatingRuleMatchDetails,terminatingRuleType,timestamp,webaclId') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://athenawaflogs/WebACL/'
  2. Athena コンソールのクエリエディタで CREATE EXTERNAL TABLE ステートメントを実行します。これによって waf_logs テーブルが登録され、その中のデータを Athena からのクエリに使用できるようにします。

パーティション射影を使用した Athena での AWS WAF ログ用のテーブルの作成

AWS WAF ログには、パーティションスキームを事前に指定できる既知の構造があるため、Athena のパーティション射影機能を使用することで、クエリの実行時間を短縮し、パーティション管理を自動化することが可能です。パーティション投影は、新しいデータが追加されるたびに、新しいパーティションを自動で追加します。これにより、ALTER TABLE ADD PARTITION を使用してパーティションを手動で追加する必要がなくなります。

以下の CREATE TABLE ステートメント例は、単一の AWS リージョンについて、指定された日付から現在までの AWS WAF ログでパーティション投影を自動的に使用します。LOCATION および storage.location.template では、bucketfolder の各プレースホルダを AWS WAF ログの Amazon S3 バケットの場所を特定する値に置き換えます。projection.day.range については、[2021 年/01/01] を、使用する開始日に置き換えます。クエリが正常に実行されると、テーブルをクエリできます。実行する必要はありませんALTER TABLE ADD PARTITIONをクリックしてパーティションをロードします。

CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array< struct< conditiontype:string, location:string, matcheddata:array<string> > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array< struct< rulegroupid:string, terminatingrule:struct< ruleid:string, action:string, rulematchdetails:string >, nonterminatingmatchingrules:array< struct< ruleid:string, action:string, rulematchdetails:array< struct< conditiontype:string, location:string, matcheddata:array<string> > > > >, excludedrules:array< struct< ruleid:string, exclusiontype:string > > > >, `ratebasedrulelist` array< struct< ratebasedruleid:string, limitkey:string, maxrateallowed:int > >, `nonterminatingmatchingrules` array< struct< ruleid:string, action:string > >, `requestheadersinserted` string, `responsecodesent` string, `httprequest` struct< clientip:string, country:string, headers:array< struct< name:string, value:string > >, uri:string, args:string, httpversion:string, httpmethod:string, requestid:string >, `labels` array< struct< name:string > > ) PARTITIONED BY ( day STRING ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://bucket/folder/' TBLPROPERTIES ( "projection.enabled" = "true", "projection.day.type" = "date", "projection.day.range" = "2021/01/01,NOW", "projection.day.format" = "yyyy/MM/dd", "projection.day.interval" = "1", "projection.day.interval.unit" = "DAYS", "storage.location.template" = "s3://bucket/folder/${day}" )

パーティション射影の詳細については、「Amazon Athena でのパーティション射影」を参照してください。

AWS WAF フローログのクエリ例

以下のクエリ例では、要件に応じてテーブル名、カラム値、およびその他の変数を変更します。クエリのパフォーマンスを改善してコストを削減するには、フィルタ条件にパーティション列を追加します。

指定された用語を含むリファラーの数を計上する

以下のクエリは、指定された日付範囲内の「amazon」という用語を含むリファラーの数を計上します。

WITH test_dataset AS (SELECT header FROM waf_logs CROSS JOIN UNNEST(httprequest.headers) AS t(header) WHERE day >= '2021/03/01' AND day < '2021/03/31') SELECT COUNT(*) referer_count FROM DATASET WHERE LOWER(header.name)='referer' AND header.value LIKE '%amazon%'

一致する除外ルールがある、過去 10 日間のすべての一致するた IP アドレスを計上する

以下のクエリは、IP アドレスがルールグループ内の除外ルールに一致した過去 10 日間の回数を計上します。

WITH test_dataset AS (SELECT * FROM waf_logs CROSS JOIN UNNEST(rulegrouplist) AS t(allrulegroups)) SELECT COUNT(*) AS count, "httprequest"."clientip", "allrulegroups"."excludedrules", "allrulegroups"."ruleGroupId" FROM test_dataset WHERE allrulegroups.excludedrules IS NOT NULL AND from_unixtime(timestamp/1000) > now() - interval '10' day GROUP BY "httprequest"."clientip", "allrulegroups"."ruleGroupId", "allrulegroups"."excludedrules" ORDER BY count DESC

日付と時刻の使用

人間が読める ISO 8601 形式のタイムスタンプフィールドを返す

以下のクエリは、from_unixtime および to_iso8601 関数を使用して、timestamp フィールドを人間が読める ISO 8601 形式 (例えば、1576280412771 ではなく 2019-12-13T23:40:12.000Z で) 返します。このクエリは、HTTP ソース名、ソース ID、およびリクエストも返します。

SELECT to_iso8601(from_unixtime(timestamp / 1000)) as time_ISO_8601, httpsourcename, httpsourceid, httprequest FROM waf_logs LIMIT 10;

過去 24 時間のレコードを返す

以下のクエリは、WHERE 句でフィルタを使用して、過去 24 時間のレコードに関する HTTP ソース名、HTTP ソース ID、および HTTP リクエストフィールドを返します。

SELECT to_iso8601(from_unixtime(timestamp/1000)) AS time_ISO_8601, httpsourcename, httpsourceid, httprequest FROM waf_logs WHERE from_unixtime(timestamp/1000) > now() - interval '1' day LIMIT 10;

指定された日付範囲と IP アドレスのレコードを返す

以下のクエリは、指定されたクライアント IP アドレスの指定された日付範囲内のレコードをリストします。

SELECT * FROM waf_logs WHERE httprequest.clientip='53.21.198.66' AND day >= '2021/03/01' AND day < '2021/03/31'

指定された日付範囲について、5 分間隔で IP アドレスの数を計上する

以下のクエリは、特定の日付範囲について、5 分間隔で IP アドレスの数を計上します。

WITH test_dataset AS (SELECT format_datetime(from_unixtime((timestamp/1000) - ((minute(from_unixtime(timestamp / 1000))%5) * 60)),'yyyy-MM-dd HH:mm') AS five_minutes_ts, "httprequest"."clientip" FROM waf_logs WHERE day >= '2021/03/01' AND day < '2021/03/31') SELECT five_minutes_ts,"clientip",count(*) ip_count FROM test_dataset GROUP BY five_minutes_ts,"clientip"

日付関数と時刻関数の詳細については、Presto ドキュメントの「Date and Time Functions and Operators」を参照してください。

ブロックされた要求とアドレスでの作業

指定されたルールタイプによってブロックされた上位 100 個の IP アドレスを抽出する

以下のクエリは、指定した日付範囲内に RATE_BASED 終了ルールによってブロックされた上位 100 個の IP アドレスを抽出し、計上します。

SELECT COUNT(httpRequest.clientIp) as count, httpRequest.clientIp FROM waf_logs WHERE terminatingruletype='RATE_BASED' AND action='BLOCK' and day >= '2021/03/01' AND day < '2021/03/31' GROUP BY httpRequest.clientIp ORDER BY count DESC LIMIT 100

指定された国からのリクエストがブロックされた回数を計上する

以下のクエリは、アイルランド (IE) に属し、RATE_BASED 終了ルールによってブロックされた IP アドレスからリクエストが到達した回数を数えます。

SELECT COUNT(httpRequest.country) as count, httpRequest.country FROM waf_logs WHERE terminatingruletype='RATE_BASED' AND httpRequest.country='IE' GROUP BY httpRequest.country ORDER BY count LIMIT 100;

リクエストがブロックされた回数を計上し、特定の属性でグループ化する

以下のクエリは、リクエストがブロックされた回数をカウントし、結果を WebACL、RuleId、ClientIP、および HTTP リクエスト URI によってグループ化します。

SELECT COUNT(*) AS count, webaclid, terminatingruleid, httprequest.clientip, httprequest.uri FROM waf_logs WHERE action='BLOCK' GROUP BY webaclid, terminatingruleid, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;

特定の終了ルール ID が一致した回数を計上する

以下のクエリは、特定の終了ルール ID が (WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e') に一致した回数を計上します。その結果を WebACL、Action、ClientIP、および HTTP Request URI によってグループ化します。

SELECT COUNT(*) AS count, webaclid, action, httprequest.clientip, httprequest.uri FROM waf_logs WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e' GROUP BY webaclid, action, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;

指定した日付範囲内にブロックされた上位 100 個の IP アドレスを取得する

以下のクエリは、指定された日付範囲内にブロックされた上位 100 個の IP アドレスを抽出します。このクエリは、IP アドレスがブロックされた回数もリストします。

SELECT "httprequest"."clientip", "count"(*) "ipcount", "httprequest"."country" FROM waf_logs WHERE "action" = 'BLOCK' and day >= '2021/03/01' AND day < '2021/03/31' GROUP BY "httprequest"."clientip", "httprequest"."country" ORDER BY "ipcount" DESC limit 100