AWS WAF ログのクエリ - Amazon Athena

AWS WAF ログのクエリ

AWS WAF は、保護されたウェブアプリケーションがクライアントから受信する HTTP および HTTPS リクエストを監視して制御できるようにするウェブアプリケーションファイアウォールです。AWS WAF ウェブアクセスコントロールリスト (ACL) 内のルールを設定することにより、ウェブリクエストの処理方法を定義します。その後、ウェブアプリケーションにウェブ ACL を関連付けて保護します。AWS WAF で保護できるウェブアプリケーションリソースの例には、Amazon CloudFront ディストリビューション、Amazon API Gateway REST API、Application Load Balancers などがあります。AWS WAF の詳細については、「AWS WAF デベロッパーガイド」の「AWS WAF」を参照してください。

AWS WAF ログには、AWS WAF が AWS リソースからリクエストを受信した時間、このリクエストの詳細、各リクエストが適合したルールに対するアクションなど、ウェブ ACL によって分析されたトラフィックに関する情報が含まれます。

AWS WAF ウェブ ACL を設定して、複数の宛先のいずれかにログを発行し、そこでクエリを実行して表示できます。ウェブ ACL ログの設定および AWS WAF ログの内容の詳細については、「AWS WAF デベロッパーガイド」の「AWS WAF ウェブ ACL トラフィックのログ」を参照してください。

AWS WAF ログを中央データレイクリポジトリに集約して、それらを Athena でクエリする方法の例については、「AWS ビッグデータブログ」のブログ記事「OpenSearch Service、Amazon Athena、および Amazon QuickSight を使用して AWS WAF ログを分析する」を参照してください。

このトピックでは、パーティショニングを使用するものと使用しないもの、2 つの CREATE TABLE ステートメントの例を示します。

注記

このトピックの CREATE TABLE ステートメントは、v1 および v2 AWS WAF ログの両方に使用できます。v1 では、webaclid フィールドに ID が含まれます。v2 では、webaclid フィールドに完全な ARN が含まれます。ここでの CREATE TABLE ステートメントは、string データ型を使用して、アグノスティックにこのコンテンツを取り扱います。

パーティション射影を使用して Athena で AWS WAF S3 ログ用テーブルの作成

AWS WAF ログには、パーティションスキームを事前に指定できる既知の構造があるため、Athena のパーティション射影機能を使用することで、クエリランタイムを短縮し、パーティション管理を自動化することが可能です。新しいデータが追加されると、パーティション射影は新しいパーティションを自動で追加します。このため、ALTER TABLE ADD PARTITION を使用してパーティションを手動で追加する必要がなくなります。

次の CREATE TABLE ステートメント例では、指定された日付から現在までの 4 つの異なる AWS WAF リージョンの AWS ログで、パーティション射影を自動的に使用しています。この例の PARTITION BY 句は地域と日付でパーティショニングしますが、要件に応じてこれを変更できます。ログ出力と一致するように、必要に応じてフィールドを変更します。LOCATION および storage.location.template 句では、bucketaccountID の各プレースホルダーを AWS WAF ログの Amazon S3 バケットの場所を特定する値に置き換えます。projection.day.range では、2021/01/01 を使用を開始する日に置き換えます。クエリが正常に実行されると、テーブルをクエリできます。パーティションをロードするのに、ALTER TABLE ADD PARTITION を実行する必要はありません。

CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array < struct < rulegroupid: string, terminatingrule: struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > >, nonterminatingmatchingrules: array < struct < ruleid: string, action: string, overriddenaction: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, challengeresponse: struct < responsecode: string, solvetimestamp: string >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, excludedrules: string > >, `ratebasedrulelist` array < struct < ratebasedruleid: string, limitkey: string, maxrateallowed: int > >, `nonterminatingmatchingrules` array < struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, challengeresponse: struct < responsecode: string, solvetimestamp: string >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, `requestheadersinserted` array < struct < name: string, value: string > >, `responsecodesent` string, `httprequest` struct < clientip: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpversion: string, httpmethod: string, requestid: string >, `labels` array < struct < name: string > >, `captcharesponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `challengeresponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `ja3Fingerprint` string, `oversizefields` string, `requestbodysize` int, `requestbodysizeinspectedbywaf` int ) PARTITIONED BY ( `region` string, `date` string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/AWSLogs/accountID/WAFLogs/region/DOC-EXAMPLE-WEBACL/' TBLPROPERTIES( 'projection.enabled' = 'true', 'projection.region.type' = 'enum', 'projection.region.values' = 'us-east-1,us-west-2,eu-central-1,eu-west-1', 'projection.date.type' = 'date', 'projection.date.range' = '2021/01/01,NOW', 'projection.date.format' = 'yyyy/MM/dd', 'projection.date.interval' = '1', 'projection.date.interval.unit' = 'DAYS', 'storage.location.template' = 's3://DOC-EXAMPLE-BUCKET/AWSLogs/accountID/WAFLogs/${region}/DOC-EXAMPLE-WEBACL/${date}/')
注記

この例にある LOCATION 句のパスの形式は標準ですが、実装した AWS WAF 設定によって異なる場合があります。例えば、次の例の AWS WAF ログパスは CloudFront 分散用です。

s3://DOC-EXAMPLE-BUCKET/AWSLogs/12345678910/WAFLogs/cloudfront/cloudfronyt/2022/08/08/17/55/

AWS WAF ログテーブルの作成またはクエリ中に問題が発生した場合は、ログデータの場所を確定するか、AWS Support までお問い合わせください

パーティション射影の詳細については、「Amazon Athena でのパーティション射影」を参照してください。

パーティショニングなしでの AWS WAF ログのテーブルの作成

このセクションでは、パーティショニングまたはパーティション射影を使用せずに AWS WAF ログのテーブルを作成する方法について説明します。

注記

パフォーマンスおよびコスト上の理由により、クエリにパーティション化されていないスキーマを使用することは推奨されません。詳細については、AWS Big Data ブログの「Amazon Athena のパフォーマンスチューニング Tips トップ 10」を参照してください。

AWS WAF テーブルを作成するには

  1. 以下の DDL ステートメントをコピーして Athena コンソール内に貼り付けます。ログ出力と一致するように、必要に応じてフィールドを変更します。ログを保存するバケットに対応するように Amazon S3 バケットの LOCATION を変更します。

    このクエリでは、OpenX JSON SerDe を使用します。

    注記

    SerDe では、各 JSON ドキュメントが、レコード内のフィールドを区切る行終端文字なしの、1 行のテキストに収まっていることを想定しています。JSON テキストがプリティプリント形式の場合、テーブルを作成した後にクエリを実行しようとすると、以下のようなエラーメッセージが表示される場合があります。「HIVE_CURSOR_ERROR: Row is not a valid JSON Object」、または「HIVE_CURSOR_ERROR: JsonParseException: Unexpected end-of-input: expected close marker for OBJECT」。詳細については、GitHub の OpenX SerDe のドキュメントで「JSON Data Files」(JSON データファイル) を参照してください。

    CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array < struct < rulegroupid: string, terminatingrule: struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > >, nonterminatingmatchingrules: array < struct < ruleid: string, action: string, overriddenaction: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, challengeresponse: struct < responsecode: string, solvetimestamp: string >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, excludedrules: string > >, `ratebasedrulelist` array < struct < ratebasedruleid: string, limitkey: string, maxrateallowed: int > >, `nonterminatingmatchingrules` array < struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, challengeresponse: struct < responsecode: string, solvetimestamp: string >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, `requestheadersinserted` array < struct < name: string, value: string > >, `responsecodesent` string, `httprequest` struct < clientip: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpversion: string, httpmethod: string, requestid: string >, `labels` array < struct < name: string > >, `captcharesponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `challengeresponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `ja3Fingerprint` string, `oversizefields` string, `requestbodysize` int, `requestbodysizeinspectedbywaf` int ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/'
  2. Athena コンソールのクエリエディタで CREATE EXTERNAL TABLE ステートメントを実行します。これで waf_logs テーブルが登録され、その中のデータを Athena からのクエリに使用できるようになります。

AWS WAF ログのクエリ例

次の多くのクエリ例では、このドキュメントで前に作成したパーティション射影テーブルを使用します。また、要件に応じて例のテーブル名、列の値、その他の変数を変更します。クエリのパフォーマンスを改善してコストを削減するには、フィルター条件にパーティション列を追加します。

 

 

 

– 指定された用語を含むリファラーの数を計上する

以下のクエリは、指定された日付範囲内での「amazon」という用語を含むリファラーの数を計上します。

WITH test_dataset AS (SELECT header FROM waf_logs CROSS JOIN UNNEST(httprequest.headers) AS t(header) WHERE "date" >= '2021/03/01' AND "date" < '2021/03/31') SELECT COUNT(*) referer_count FROM test_dataset WHERE LOWER(header.name)='referer' AND header.value LIKE '%amazon%'
– 一致する除外ルールがある、過去 10 日間のすべての一致した IP アドレスを計上する

以下のクエリは、IP アドレスがルールグループ内の除外ルールに一致した過去 10 日間の回数を計上します。

WITH test_dataset AS (SELECT * FROM waf_logs CROSS JOIN UNNEST(rulegrouplist) AS t(allrulegroups)) SELECT COUNT(*) AS count, "httprequest"."clientip", "allrulegroups"."excludedrules", "allrulegroups"."ruleGroupId" FROM test_dataset WHERE allrulegroups.excludedrules IS NOT NULL AND from_unixtime(timestamp/1000) > now() - interval '10' day GROUP BY "httprequest"."clientip", "allrulegroups"."ruleGroupId", "allrulegroups"."excludedrules" ORDER BY count DESC
– カウントされたすべてのマネージドルールをマッチした回数でグループ化する

2022 年 10 月 27 日より前にウェブ ACL 設定でルールグループのルールアクションをカウントに設定した場合、AWS WAF はウェブ ACL JSON 内のオーバーライドを excludedRules として保存しました。これで、ルールをカウントにオーバーライドする JSON 設定が ruleActionOverrides 設定に追加されました。詳しくは、「AWS WAF デベロッパーガイド」の「ルールグループのアクションオーバーライド」をご覧ください。新しいログ構造からカウントモードのマネージドルールを抽出するには、次の例のように、excludedRules フィールドの代わりに ruleGroupList セクションの nonTerminatingMatchingRules をクエリします。

SELECT count(*) AS count, httpsourceid, httprequest.clientip, t.rulegroupid, t.nonTerminatingMatchingRules FROM "waf_logs" CROSS JOIN UNNEST(rulegrouplist) AS t(t) WHERE action <> 'BLOCK' AND cardinality(t.nonTerminatingMatchingRules) > 0 GROUP BY t.nonTerminatingMatchingRules, action, httpsourceid, httprequest.clientip, t.rulegroupid ORDER BY "count" DESC Limit 50
– カウントされたすべてのカスタムルールをマッチした回数でグループ化する

次のクエリは、カウントされたすべてのカスタムルールをマッチした回数でグループ化します。

SELECT count(*) AS count, httpsourceid, httprequest.clientip, t.ruleid, t.action FROM "waf_logs" CROSS JOIN UNNEST(nonterminatingmatchingrules) AS t(t) WHERE action <> 'BLOCK' AND cardinality(nonTerminatingMatchingRules) > 0 GROUP BY t.ruleid, t.action, httpsourceid, httprequest.clientip ORDER BY "count" DESC Limit 50

カスタムルールとマネージドルールグループのログの場所については、「AWS WAF デベロッパーガイド」の「モニタリングとチューニング」を参照してください。

日付と時刻の使用

– 人が読み込み可能な ISO 8601 形式のタイムスタンプフィールドを返す

以下のクエリは、from_unixtime および to_iso8601 関数を使用して、timestamp フィールドを人が読み込み可能な ISO 8601 形式 (例えば、1576280412771 ではなく 2019-12-13T23:40:12.000Z ) で返します。このクエリは、HTTP ソース名、ソース ID、およびリクエストも返します。

SELECT to_iso8601(from_unixtime(timestamp / 1000)) as time_ISO_8601, httpsourcename, httpsourceid, httprequest FROM waf_logs LIMIT 10;
– 過去 24 時間のレコードを返す

以下のクエリは、WHERE 句でフィルターを使用して、過去 24 時間のレコードに関する HTTP ソース名、HTTP ソース ID、および HTTP リクエストフィールドを返します。

SELECT to_iso8601(from_unixtime(timestamp/1000)) AS time_ISO_8601, httpsourcename, httpsourceid, httprequest FROM waf_logs WHERE from_unixtime(timestamp/1000) > now() - interval '1' day LIMIT 10;
– 指定された日付範囲と IP アドレスのレコードを返す

以下のクエリは、指定されたクライアント IP アドレスの指定された日付範囲内のレコードをリストします。

SELECT * FROM waf_logs WHERE httprequest.clientip='53.21.198.66' AND "date" >= '2021/03/01' AND "date" < '2021/03/31'
– 指定された日付範囲について、5 分間隔で IP アドレスの数を計上する

以下のクエリは、特定の日付範囲について、5 分間隔で IP アドレスの数を計上します。

WITH test_dataset AS (SELECT format_datetime(from_unixtime((timestamp/1000) - ((minute(from_unixtime(timestamp / 1000))%5) * 60)),'yyyy-MM-dd HH:mm') AS five_minutes_ts, "httprequest"."clientip" FROM waf_logs WHERE "date" >= '2021/03/01' AND "date" < '2021/03/31') SELECT five_minutes_ts,"clientip",count(*) ip_count FROM test_dataset GROUP BY five_minutes_ts,"clientip"
– 過去 10 日間の X-Forwarded-For IP の数をカウントする

次のクエリは、リクエストヘッダーをフィルタリングし、過去 10 日間の X-Forwarded-For IP の数をカウントします。

WITH test_dataset AS (SELECT header FROM waf_logs CROSS JOIN UNNEST (httprequest.headers) AS t(header) WHERE from_unixtime("timestamp"/1000) > now() - interval '10' DAY) SELECT header.value AS ip, count(*) AS COUNT FROM test_dataset WHERE header.name='X-Forwarded-For' GROUP BY header.value ORDER BY COUNT DESC

日付関数と時刻関数の詳細については、Trino ドキュメントの「Date and Time Functions and Operators」(日付と時刻の関数と演算子) を参照してください。

ブロックされたリクエストとアドレスでの作業

– 指定されたルールタイプによってブロックされた上位 100 個の IP アドレスを抽出する

以下のクエリは、指定した日付範囲内で RATE_BASED 終了ルールによってブロックされた上位 100 個の IP アドレスを抽出し、計上します。

SELECT COUNT(httpRequest.clientIp) as count, httpRequest.clientIp FROM waf_logs WHERE terminatingruletype='RATE_BASED' AND action='BLOCK' and "date" >= '2021/03/01' AND "date" < '2021/03/31' GROUP BY httpRequest.clientIp ORDER BY count DESC LIMIT 100
– 指定された国からのリクエストがブロックされた回数を計上する

次のクエリは、アイルランド (IE) に属し、RATE_BASED 終了ルールによってブロックされた IP アドレスからリクエストが到着した回数を数えます。

SELECT COUNT(httpRequest.country) as count, httpRequest.country FROM waf_logs WHERE terminatingruletype='RATE_BASED' AND httpRequest.country='IE' GROUP BY httpRequest.country ORDER BY count LIMIT 100;
– リクエストがブロックされた回数を計上し、特定の属性でグループ化する

以下のクエリは、リクエストがブロックされた回数をカウントし、結果を WebACL、RuleId、ClientIP、および HTTP リクエスト URI によってグループ化します。

SELECT COUNT(*) AS count, webaclid, terminatingruleid, httprequest.clientip, httprequest.uri FROM waf_logs WHERE action='BLOCK' GROUP BY webaclid, terminatingruleid, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;
– 特定の終了ルール ID が一致した回数を計上する

以下のクエリは、特定の終了ルール ID が (WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e') に一致した回数を計上します。その結果を WebACL、Action、ClientIP、および HTTP Request URI によってグループ化します。

SELECT COUNT(*) AS count, webaclid, action, httprequest.clientip, httprequest.uri FROM waf_logs WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e' GROUP BY webaclid, action, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;
– 指定した日付範囲内でブロックされた上位 100 個の IP アドレスを取得する

以下のクエリは、指定された日付範囲内でブロックされた上位 100 個の IP アドレスを抽出します。このクエリは、IP アドレスがブロックされた回数もリストします。

SELECT "httprequest"."clientip", "count"(*) "ipcount", "httprequest"."country" FROM waf_logs WHERE "action" = 'BLOCK' and "date" >= '2021/03/01' AND "date" < '2021/03/31' GROUP BY "httprequest"."clientip", "httprequest"."country" ORDER BY "ipcount" DESC limit 100