AWS WAF ログのクエリ - Amazon Athena

AWS WAF ログのクエリ

AWS WAF ログには、AWS WAF が AWS リソースからリクエストを受信した時間、このリクエストの詳細、各リクエストが適合したルールに対するアクションなど、ウェブ ACL によって分析されたトラフィックに関する情報が含まれます。

AWS WAF ログのアクセスのログ記録を有効にして、それらを Simple Storage Service (Amazon S3) に保存できます。これらのログを保存する Amazon S3 バケットをメモしておくと、それらの Athena テーブルを作成して、Athena でクエリすることができます。

AWS WAF ログの有効化とログレコードの構造についての詳細は、「AWS WAF デベロッパーガイド」の「ウェブ ACL トラフィック情報のログ記録」を参照してください。

個々の AWS WAF ログフィールドの詳細については、「AWS WAF デベロッパーガイド」の「Log Fields」(ログフィールド) を参照してください。

AWS WAF ログを中央データレイクリポジトリに集約して、それらを Athena でクエリする方法の例については、「AWS ビッグデータブログ」のブログ記事「OpenSearch Service、Amazon Athena、および Amazon QuickSight を使用して AWS WAF ログを分析する」を参照してください。

このトピックでは、パーティショニングを使用するものと使用しないもの、2 つの CREATE TABLE ステートメントの例を示します。

注記

このトピックの CREATE TABLE ステートメントは、v1 および v2 AWS WAF ログの両方に使用できます。v1 では、webaclid フィールドに ID が含まれます。v2 では、webaclid フィールドに完全な ARN が含まれます。ここでの CREATE TABLE ステートメントは、string データ型を使用して、アグノスティックにこのコンテンツを取り扱います。

パーティション射影を使用した Athena での AWS WAF ログ用のテーブルの作成

AWS WAF ログには、パーティションスキームを事前に指定できる既知の構造があるため、Athena のパーティション射影機能を使用することで、クエリの実行時間を短縮し、パーティション管理を自動化することが可能です。新しいデータが追加されると、パーティション投影は新しいパーティションを自動で追加します。このため、ALTER TABLE ADD PARTITION を使用してパーティションを手動で追加する必要がなくなります。

次の CREATE TABLE ステートメント例では、指定された日付から現在までの 4 つの異なる AWS WAF リージョンの AWS ログで、パーティション射影を自動的に使用しています。この例の PARTITION BY 句は地域と日付でパーティショニングしますが、要件に応じてこれを変更できます。LOCATION および storage.location.template 句では、bucketaccountID の各プレースホルダーを AWS WAF ログの Amazon S3 バケットの場所を特定する値に置き換えます。projection.day.range では、2021/01/01 を使用を開始する日に置き換えます。クエリが正常に実行されると、テーブルをクエリできます。パーティションをロードするのに、ALTER TABLE ADD PARTITION を実行する必要はありません。

CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array< struct< conditiontype:string, location:string, matcheddata:array<string> > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array< struct< rulegroupid:string, terminatingrule:struct< ruleid:string, action:string, rulematchdetails:string >, nonterminatingmatchingrules:array<string>, excludedrules:string > >, `ratebasedrulelist` array< struct< ratebasedruleid:string, limitkey:string, maxrateallowed:int > >, `nonterminatingmatchingrules` array< struct< ruleid:string, action:string > >, `requestheadersinserted` string, `responsecodesent` string, `httprequest` struct< clientip:string, country:string, headers:array< struct< name:string, value:string > >, uri:string, args:string, httpversion:string, httpmethod:string, requestid:string >, `labels` array< struct< name:string > >, `captcharesponse` struct< responsecode:string, solvetimestamp:string, failureReason:string > ) PARTITIONED BY ( `region` string, `date` string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://bucket/AWSLogs/accountID/WAFLogs/region/webACL/' TBLPROPERTIES( 'projection.enabled' = 'true', 'projection.region.type' = 'enum', 'projection.region.values' = 'us-east-1,us-west-2,eu-central-1,eu-west-1', 'projection.date.type' = 'date', 'projection.date.range' = '2021/01/01,NOW', 'projection.date.format' = 'yyyy/MM/dd', 'projection.date.interval' = '1', 'projection.date.interval.unit' = 'DAYS', 'storage.location.template' = 's3://bucket/AWSLogs/accountID/WAFLogs/${region}/webACL/${date}/')

パーティション射影の詳細については、「Amazon Athena でのパーティション射影」を参照してください。

パーティショニングなしでの AWS WAF ログのテーブルの作成

このセクションでは、パーティショニングまたはパーティション射影を使用せずに AWS WAF ログのテーブルを作成する方法について説明します。

AWS WAF テーブルを作成するには

  1. 以下の DDL ステートメントをコピーして Athena コンソール内に貼り付けます。ログを保存するバケットに対応するように Amazon S3 バケットの LOCATION を変更します。

    このクエリでは、OpenX JSON SerDe を使用します。

    注記

    SerDe では、各 JSON ドキュメントが、レコード内のフィールドを区切る行終端文字なしの、1 行のテキストに収まっていることを想定しています。JSON テキストがプリティプリント形式の場合、テーブルを作成した後にクエリを実行しようとすると、以下のようなエラーメッセージが表示される場合があります。「HIVE_CURSOR_ERROR: Row is not a valid JSON Object」、または「HIVE_CURSOR_ERROR: JsonParseException: Unexpected end-of-input: expected close marker for OBJECT」。詳細については、GitHub の OpenX SerDe のドキュメントで「JSON Data Files」(JSON データファイル) を参照してください。

    CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array< struct< conditiontype:string, location:string, matcheddata:array<string> > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array< struct< rulegroupid:string, terminatingrule:struct< ruleid:string, action:string, rulematchdetails:string >, nonterminatingmatchingrules:array<string>, excludedrules:string > >, `ratebasedrulelist` array< struct< ratebasedruleid:string, limitkey:string, maxrateallowed:int > >, `nonterminatingmatchingrules` array< struct< ruleid:string, action:string > >, `requestheadersinserted` string, `responsecodesent` string, `httprequest` struct< clientip:string, country:string, headers:array< struct< name:string, value:string > >, uri:string, args:string, httpversion:string, httpmethod:string, requestid:string >, `labels` array< struct< name:string > >, `captcharesponse` struct< responsecode:string, solvetimestamp:string, failureReason:string > ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/'
  2. Athena コンソールのクエリエディタで CREATE EXTERNAL TABLE ステートメントを実行します。これで waf_logs テーブルが登録され、その中のデータを Athena からのクエリに使用できるようになります。

AWS WAF ログのクエリ例

次の例では、このドキュメントで前に作成したパーティション射影テーブルに対してクエリを実行します。また、要件に応じて例のテーブル名、列の値、その他の変数を変更します。クエリのパフォーマンスを改善してコストを削減するには、フィルター条件にパーティション列を追加します。

指定された用語を含むリファラーの数を計上する

以下のクエリは、指定された日付範囲内での「amazon」という用語を含むリファラーの数を計上します。

WITH test_dataset AS (SELECT header FROM waf_logs CROSS JOIN UNNEST(httprequest.headers) AS t(header) WHERE day >= '2021/03/01' AND day < '2021/03/31') SELECT COUNT(*) referer_count FROM test_dataset WHERE LOWER(header.name)='referer' AND header.value LIKE '%amazon%'

一致する除外ルールがある、過去 10 日間のすべての一致するた IP アドレスを計上する

以下のクエリは、IP アドレスがルールグループ内の除外ルールに一致した過去 10 日間の回数を計上します。

WITH test_dataset AS (SELECT * FROM waf_logs CROSS JOIN UNNEST(rulegrouplist) AS t(allrulegroups)) SELECT COUNT(*) AS count, "httprequest"."clientip", "allrulegroups"."excludedrules", "allrulegroups"."ruleGroupId" FROM test_dataset WHERE allrulegroups.excludedrules IS NOT NULL AND from_unixtime(timestamp/1000) > now() - interval '10' day GROUP BY "httprequest"."clientip", "allrulegroups"."ruleGroupId", "allrulegroups"."excludedrules" ORDER BY count DESC

日付と時刻の使用

人が読み取り可能な ISO 8601 形式のタイムスタンプフィールドを返す

以下のクエリは、from_unixtime および to_iso8601 関数を使用して、timestamp フィールドを人が読み取り可能な ISO 8601 形式 (例えば、1576280412771 ではなく 2019-12-13T23:40:12.000Z ) で返します。このクエリは、HTTP ソース名、ソース ID、およびリクエストも返します。

SELECT to_iso8601(from_unixtime(timestamp / 1000)) as time_ISO_8601, httpsourcename, httpsourceid, httprequest FROM waf_logs LIMIT 10;

過去 24 時間のレコードを返す

以下のクエリは、WHERE 句でフィルターを使用して、過去 24 時間のレコードに関する HTTP ソース名、HTTP ソース ID、および HTTP リクエストフィールドを返します。

SELECT to_iso8601(from_unixtime(timestamp/1000)) AS time_ISO_8601, httpsourcename, httpsourceid, httprequest FROM waf_logs WHERE from_unixtime(timestamp/1000) > now() - interval '1' day LIMIT 10;

指定された日付範囲と IP アドレスのレコードを返す

以下のクエリは、指定されたクライアント IP アドレスの指定された日付範囲内のレコードをリストします。

SELECT * FROM waf_logs WHERE httprequest.clientip='53.21.198.66' AND day >= '2021/03/01' AND day < '2021/03/31'

指定された日付範囲について、5 分間隔で IP アドレスの数を計上する

以下のクエリは、特定の日付範囲について、5 分間隔で IP アドレスの数を計上します。

WITH test_dataset AS (SELECT format_datetime(from_unixtime((timestamp/1000) - ((minute(from_unixtime(timestamp / 1000))%5) * 60)),'yyyy-MM-dd HH:mm') AS five_minutes_ts, "httprequest"."clientip" FROM waf_logs WHERE day >= '2021/03/01' AND day < '2021/03/31') SELECT five_minutes_ts,"clientip",count(*) ip_count FROM test_dataset GROUP BY five_minutes_ts,"clientip"

過去 10 日間の X-Forwarded-For IP の数をカウントする

次のクエリは、リクエストヘッダーをフィルタリングし、過去 10 日間の X-Forwarded-For IP の数をカウントします。

WITH test_dataset AS (SELECT header FROM waf_logs CROSS JOIN UNNEST (httprequest.headers) AS t(header) WHERE from_unixtime("timestamp"/1000) > now() - interval '10' DAY) SELECT header.value AS ip, count(*) AS COUNT FROM test_dataset WHERE header.name='X-Forwarded-For' GROUP BY header.value ORDER BY COUNT DESC

日付関数と時刻関数の詳細については、Presto ドキュメントの「Date and Time Functions and Operators」を参照してください。

ブロックされたリクエストとアドレスでの作業

指定されたルールタイプによってブロックされた上位 100 個の IP アドレスを抽出する

以下のクエリは、指定した日付範囲内で RATE_BASED 終了ルールによってブロックされた上位 100 個の IP アドレスを抽出し、計上します。

SELECT COUNT(httpRequest.clientIp) as count, httpRequest.clientIp FROM waf_logs WHERE terminatingruletype='RATE_BASED' AND action='BLOCK' and day >= '2021/03/01' AND day < '2021/03/31' GROUP BY httpRequest.clientIp ORDER BY count DESC LIMIT 100

指定された国からのリクエストがブロックされた回数を計上する

次のクエリは、アイルランド (IE) に属し、RATE_BASED 終了ルールによってブロックされた IP アドレスからリクエストが到着した回数を数えます。

SELECT COUNT(httpRequest.country) as count, httpRequest.country FROM waf_logs WHERE terminatingruletype='RATE_BASED' AND httpRequest.country='IE' GROUP BY httpRequest.country ORDER BY count LIMIT 100;

リクエストがブロックされた回数を計上し、特定の属性でグループ化する

以下のクエリは、リクエストがブロックされた回数をカウントし、結果を WebACL、RuleId、ClientIP、および HTTP リクエスト URI によってグループ化します。

SELECT COUNT(*) AS count, webaclid, terminatingruleid, httprequest.clientip, httprequest.uri FROM waf_logs WHERE action='BLOCK' GROUP BY webaclid, terminatingruleid, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;

特定の終了ルール ID が一致した回数を計上する

以下のクエリは、特定の終了ルール ID が (WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e') に一致した回数を計上します。その結果を WebACL、Action、ClientIP、および HTTP Request URI によってグループ化します。

SELECT COUNT(*) AS count, webaclid, action, httprequest.clientip, httprequest.uri FROM waf_logs WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e' GROUP BY webaclid, action, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;

指定した日付範囲内でブロックされた上位 100 個の IP アドレスを取得する

以下のクエリは、指定された日付範囲内でブロックされた上位 100 個の IP アドレスを抽出します。このクエリは、IP アドレスがブロックされた回数もリストします。

SELECT "httprequest"."clientip", "count"(*) "ipcount", "httprequest"."country" FROM waf_logs WHERE "action" = 'BLOCK' and day >= '2021/03/01' AND day < '2021/03/31' GROUP BY "httprequest"."clientip", "httprequest"."country" ORDER BY "ipcount" DESC limit 100