AWS WAF ログのクエリ - Amazon Athena

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

AWS WAF ログのクエリ

AWS WAF ログには、が AWS リソースからリクエストを受信した時間、このリクエストの詳細、各リクエストが一致したルールのアクションなど、ウェブ ACL によって分析されたトラフィックに関する情報が含まれます。

AWS WAF ログのアクセスログを有効にして Amazon S3 に保存することができます。これらのログを保存する Amazon S3 バケットを書き留めておくと、それらの Athena テーブルを作成し、Athena でクエリを実行できます。

AWS WAF ログの有効化とログレコード構造の詳細については、ウェブ ACL トラフィック情報のログ記録()AWS WAF 開発者ガイド

AWS WAF ログを中央データレイクリポジトリに集約して Athena でクエリする方法の例については、AWS ビッグデータブログの投稿を参照してください。Amazon ES、Amazon アテナ、Amazon QuickSight を使用した AWS WAF ログの分析

AWS WAF ログのテーブルの作成

AWS WAF テーブルを作成するには

  1. 次の DDL ステートメントをコピーして Athena コンソール内に貼り付けます。の変更LOCATIONをログの保存先の Amazon S3 バケットに変更します。

    このクエリでは、OpenX JSON SerDe を使用します。テーブル形式と SerDe は、AWS WAF ログを分析する際に AWS Glue クローラによって提案されます。

    注記

    SerDe では、Amazon S3 の WAF ログの JSON レコードが、レコード内のフィールドを区切る行終端文字なしの 1 行のテキストにあると想定しています。WAF ログ JSON テキストがプリティ印刷形式の場合、エラーメッセージ HIVE_CURSOR_ERROR: Row is not a valid JSON Object は、テーブルを作成した後にクエリを実行しようとしたときに呼び出されます。

    CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array< struct< conditiontype:string, location:string, matcheddata:array<string> > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array< struct< rulegroupid:string, terminatingrule:struct< ruleid:string, action:string, rulematchdetails:string >, nonterminatingmatchingrules:array< struct< ruleid:string, action:string, rulematchdetails:array< struct< conditiontype:string, location:string, matcheddata:array<string> > > > >, excludedrules:string > >, `ratebasedrulelist` array< struct< ratebasedruleid:string, limitkey:string, maxrateallowed:int > >, `nonterminatingmatchingrules` array< struct< ruleid:string, action:string > >, `requestheadersinserted` string, `responsecodesent` string, `httprequest` struct< clientip:string, country:string, headers:array< struct< name:string, value:string > >, uri:string, args:string, httpversion:string, httpmethod:string, requestid:string >, `labels` array< struct< name:string > > ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' WITH SERDEPROPERTIES ( 'paths'='action,formatVersion,httpRequest,httpSourceId,httpSourceName,labels,nonTerminatingMatchingRules,rateBasedRuleList,requestHeadersInserted,responseCodeSent,ruleGroupList,terminatingRuleId,terminatingRuleMatchDetails,terminatingRuleType,timestamp,webaclId') STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://athenawaflogs/WebACL/'
  2. を実行CREATE EXTERNAL TABLEステートメントを Athena コンソールのクエリエディターで実行します。これはwaf_logsテーブルを作成し、そのデータを Athena からのクエリで使用できるように準備します。

パーティションプロジェクションを使用した Athena での AWS WAF ログのテーブルの作成

AWS WAF ログには、事前にパーティションスキームを指定できるパーティションスキームを持つ既知の構造になっています。Athena パーティション射影機能を使用して、クエリの実行時間を短縮し、パーティション管理を自動化できます。パーティション投影は、新しいデータが追加されると、自動的に新しいパーティションを追加します。これにより、手動でパーティションを追加する必要がなくなります。ALTER TABLE ADD PARTITION

以下の例CREATE TABLEステートメントは、指定された日付から、単一の AWS リージョンの現在まで AWS WAF ログでパーティション投影を自動的に使用します。左LOCATIONおよびstorage.location.template句で置き換えるには、bucket および folder プレースホルダには、AWS WAF ログの Amazon S3 バケットの場所を識別する値が含まれています。を使用する場合projection.day.rangeを置き換えます。2021/01/01 を、使用する開始日に変更します。クエリを正常に実行した後、テーブルに対してクエリを実行できます。実行する必要はありませんALTER TABLE ADD PARTITIONをクリックしてパーティションをロードします。

CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array< struct< conditiontype:string, location:string, matcheddata:array<string> > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array< struct< rulegroupid:string, terminatingrule:struct< ruleid:string, action:string, rulematchdetails:string >, nonterminatingmatchingrules:array< struct< ruleid:string, action:string, rulematchdetails:array< struct< conditiontype:string, location:string, matcheddata:array<string> > > > >, excludedrules:string > >, `ratebasedrulelist` array< struct< ratebasedruleid:string, limitkey:string, maxrateallowed:int > >, `nonterminatingmatchingrules` array< struct< ruleid:string, action:string > >, `requestheadersinserted` string, `responsecodesent` string, `httprequest` struct< clientip:string, country:string, headers:array< struct< name:string, value:string > >, uri:string, args:string, httpversion:string, httpmethod:string, requestid:string >, `labels` array< struct< name:string > > ) PARTITIONED BY ( day STRING ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://bucket/folder/' TBLPROPERTIES ( "projection.enabled" = "true", "projection.day.type" = "date", "projection.day.range" = "2021/01/01,NOW", "projection.day.format" = "yyyy/MM/dd", "projection.day.interval" = "1", "projection.day.interval.unit" = "DAYS", "storage.location.template" = "s3://bucket/folder/${day}" )

パーティション射影の詳細については、「」を参照してください。Amazon Athena を使用したパーティション射影

AWS WAF ログのクエリ例

次のクエリ例では、要件に応じてテーブル名、カラム値、およびその他の変数を変更します。クエリのパフォーマンスを改善し、コストを削減するには、フィルタ条件にパーティション列を追加します。

指定された用語を含むリファラーの数を数える

次のクエリは、指定された日付範囲の「amazon」という用語を含むリファラーの数をカウントします。

WITH test_dataset AS (SELECT header FROM waf_logs CROSS JOIN UNNEST(httprequest.headers) AS t(header) WHERE day >= '2021/03/01' AND day < '2021/03/31') SELECT COUNT(*) referer_count FROM DATASET WHERE LOWER(header.name)='referer' AND header.value LIKE '%amazon%'

過去 10 日間に除外ルールに一致したすべての一致した IP アドレスをカウントする

次のクエリでは、IP アドレスがルールグループ内の除外ルールと一致した過去 10 日間の回数を数えます。

WITH test_dataset AS (SELECT * FROM waf_logs CROSS JOIN UNNEST(rulegrouplist) AS t(allrulegroups)) SELECT COUNT(*) AS count, "httprequest"."clientip", "allrulegroups"."excludedrules", "allrulegroups"."ruleGroupId" FROM test_dataset WHERE allrulegroups.excludedrules IS NOT NULL AND from_unixtime(timestamp/1000) > now() - interval '10' day GROUP BY "httprequest"."clientip", "allrulegroups"."ruleGroupId", "allrulegroups"."excludedrules" ORDER BY count DESC

日付および時刻の操作

人間が読めるISO 8601形式のタイムスタンプフィールドを返します

以下のクエリは、from_unixtimeおよびto_iso8601関数を使用してtimestampフィールドが人間が読める ISO 8601 形式 (たとえば、2019-12-13T23:40:12.000Zの代わりに1576280412771). クエリは、HTTP ソース名、ソース ID、およびリクエストも返します。

SELECT to_iso8601(from_unixtime(timestamp / 1000)) as time_ISO_8601, httpsourcename, httpsourceid, httprequest FROM waf_logs LIMIT 10;

過去 24 時間のレコードを返します

次のクエリでは、WHERE句を使用して、過去 24 時間のレコードの HTTP ソース名、HTTP ソース ID、および HTTP 要求フィールドを返します。

SELECT to_iso8601(from_unixtime(timestamp/1000)) AS time_ISO_8601, httpsourcename, httpsourceid, httprequest FROM waf_logs WHERE from_unixtime(timestamp/1000) > now() - interval '1' day LIMIT 10;

指定した日付範囲と IP アドレスのレコードを返す

次のクエリは、指定されたクライアント IP アドレスの指定された日付範囲内のレコードを一覧表示します。

SELECT * FROM waf_logs WHERE httprequest.clientip='53.21.198.66' AND day >= '2021/03/01' AND day < '2021/03/31'

指定した日付範囲について、5 分間隔で IP アドレスの数をカウントします。

次のクエリは、特定の日付範囲について、5 分間隔の IP アドレスの数をカウントします。

WITH test_dataset AS (SELECT format_datetime(from_unixtime((timestamp/1000) - ((minute(from_unixtime(timestamp / 1000))%5) * 60)),'yyyy-MM-dd HH:mm') AS five_minutes_ts, "httprequest"."clientip" FROM waf_logs WHERE day >= '2021/03/01' AND day < '2021/03/31') SELECT five_minutes_ts,"clientip",count(*) ip_count FROM test_dataset GROUP BY five_minutes_ts,"clientip"

日付と時刻関数の詳細については、「」を参照してください。日付/時刻関数および演算子プレストのドキュメントを参照してください。

ブロックされた要求とアドレスの操作

指定した規則の種類によってブロックされた上位 100 個の IP アドレスを抽出する

次のクエリは、抽出し、によってブロックされている上位 100 の IP アドレスをカウントします。RATE_BASED指定した日付範囲内にルールを終了します。

SELECT COUNT(httpRequest.clientIp) as count, httpRequest.clientIp FROM waf_logs WHERE terminatingruletype='RATE_BASED' AND action='BLOCK' and day >= '2021/03/01' AND day < '2021/03/31' GROUP BY httpRequest.clientIp ORDER BY count DESC LIMIT 100

指定した国からのリクエストがブロックされた回数をカウントする

次のクエリは、アイルランド (IE) に属し、RATE_BASED 終了ルールによってブロックされた IP アドレスからリクエストが到着した回数を数えます。

SELECT COUNT(httpRequest.country) as count, httpRequest.country FROM waf_logs WHERE terminatingruletype='RATE_BASED' AND httpRequest.country='IE' GROUP BY httpRequest.country ORDER BY count LIMIT 100;

リクエストがブロックされた回数をカウントし、特定の属性でグループ化する

次のクエリは、リクエストがブロックされた回数をカウントし、結果を WebACL、RuleId、ClientIP、および HTTP リクエスト URI によってグループ化します。

SELECT COUNT(*) AS count, webaclid, terminatingruleid, httprequest.clientip, httprequest.uri FROM waf_logs WHERE action='BLOCK' GROUP BY webaclid, terminatingruleid, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;

特定の終了ルール ID が一致した回数を数え

次のクエリは、特定の終了ルール ID (WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e') が一致した回数を数え、その結果を WebACL、Action、ClientIP、および HTTP Request URI によってグループ化します。

SELECT COUNT(*) AS count, webaclid, action, httprequest.clientip, httprequest.uri FROM waf_logs WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e' GROUP BY webaclid, action, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;

指定した日付範囲でブロックされた上位 100 の IP アドレスを取得する

次のクエリは、指定された日付範囲でブロックされた上位 100 の IP アドレスを抽出します。また、IP アドレスがブロックされた回数も一覧表示されます。

SELECT "httprequest"."clientip", "count"(*) "ipcount", "httprequest"."country" FROM waf_logs WHERE "action" = 'BLOCK' and day >= '2021/03/01' AND day < '2021/03/31' GROUP BY "httprequest"."clientip", "httprequest"."country" ORDER BY "ipcount" DESC limit 100