로그 쿼리 AWS WAF - Amazon Athena

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

로그 쿼리 AWS WAF

AWS WAF 보호된 웹 애플리케이션이 클라이언트로부터 받는 HTTP 및 HTTPS 요청을 모니터링하고 제어할 수 있는 웹 애플리케이션 방화벽입니다. 웹 액세스 제어 목록 (ACL) 내에서 규칙을 구성하여 AWS WAF 웹 요청을 처리하는 방법을 정의합니다. 그런 다음 웹 ACL을 연결하여 웹 애플리케이션을 보호합니다. 보호할 수 있는 웹 애플리케이션 리소스의 AWS WAF 예로는 Amazon CloudFront 배포, Amazon API Gateway REST API, 애플리케이션 로드 밸런서 등이 있습니다. 에 대한 자세한 내용은 개발자 AWS WAF안내서를 참조하십시오 AWS WAF.AWS WAF

AWS WAF 로그에는 웹 ACL에서 분석하는 트래픽에 대한 정보 (예: AWS 리소스로부터 요청을 AWS WAF 받은 시간, 요청에 대한 세부 정보, 각 요청이 일치하는 규칙에 대한 조치) 가 포함됩니다.

여러 대상 중 하나에 로그를 게시하도록 AWS WAF 웹 ACL을 구성하여 로그를 쿼리하고 볼 수 있습니다. 웹 ACL 로깅 구성 및 AWS WAF 로그 내용에 대한 자세한 내용은 개발자 안내서의 AWS WAF 웹 ACL 트래픽 로깅을AWS WAF 참조하십시오.

AWS WAF 로그를 중앙 데이터 레이크 리포지토리로 집계하고 Athena로 쿼리하는 방법에 대한 예는 빅 데이터 블로그 게시물 Service, Amazon Athena 및 Amazon을 OpenSearch 통한 로그 AWS WAF 분석을 참조하십시오 AWS . QuickSight

이 주제에서는 분할을 사용하는 것과 사용하지 않는 두 가지 CREATE TABLE 문을 예제로 제공합니다.

참고

이 주제의 CREATE TABLE 문은 v1 및 v2 AWS WAF 로그에 모두 사용할 수 있습니다. v1에서는 webaclid 필드에 ID가 포함되어 있습니다. v2에서는 webaclid 필드에 전체 ARN이 포함되어 있습니다. 이곳의 CREATE TABLE 문은 string 데이터 유형을 사용하여 애그노스틱 방식으로 이 콘텐츠를 취급합니다.

파티션 프로젝션을 사용하여 Athena에서 AWS WAF S3 로그에 대한 테이블 생성

AWS WAF 로그에는 파티션 구성표를 미리 지정할 수 있는 알려진 구조가 있으므로 Athena 파티션 프로젝션 기능을 사용하여 쿼리 런타임을 줄이고 파티션 관리를 자동화할 수 있습니다. 새 데이터가 추가되면 파티션 프로젝션은 자동으로 새 파티션을 추가합니다. 따라서 ALTER TABLE ADD PARTITION을 사용해 파티션을 수동으로 추가할 필요가 없습니다.

다음 예시 CREATE TABLE 명령문에서는 지정된 날짜부터 현재까지의 4개 지역에 대한 AWS WAF 로그에 파티션 프로젝션을 자동으로 사용합니다. AWS 이 예제의 PARTITION BY 절은 리전 및 날짜별로 분할되지만 요구 사항에 따라 수정할 수 있습니다. 필요에 따라 필드를 로그 출력과 일치하도록 수정합니다. LOCATIONstorage.location.template 절에서 버킷accountID 플레이스홀더를 로그의 Amazon S3 버킷 위치를 식별하는 값으로 바꾸십시오. AWS WAF projection.day.range에 대해 2021/01/01을 사용하려는 시작 날짜로 바꿉니다. 쿼리가 성공적으로 실행되면 테이블을 쿼리할 수 있습니다. 파티션을 로드하기 위해 ALTER TABLE ADD PARTITION을 실행하지 않아도 됩니다.

CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array < struct < rulegroupid: string, terminatingrule: struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > >, nonterminatingmatchingrules: array < struct < ruleid: string, action: string, overriddenaction: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, challengeresponse: struct < responsecode: string, solvetimestamp: string >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, excludedrules: string > >, `ratebasedrulelist` array < struct < ratebasedruleid: string, limitkey: string, maxrateallowed: int > >, `nonterminatingmatchingrules` array < struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, challengeresponse: struct < responsecode: string, solvetimestamp: string >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, `requestheadersinserted` array < struct < name: string, value: string > >, `responsecodesent` string, `httprequest` struct < clientip: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpversion: string, httpmethod: string, requestid: string >, `labels` array < struct < name: string > >, `captcharesponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `challengeresponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `ja3Fingerprint` string ) PARTITIONED BY ( `region` string, `date` string) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/AWSLogs/accountID/WAFLogs/region/DOC-EXAMPLE-WEBACL/' TBLPROPERTIES( 'projection.enabled' = 'true', 'projection.region.type' = 'enum', 'projection.region.values' = 'us-east-1,us-west-2,eu-central-1,eu-west-1', 'projection.date.type' = 'date', 'projection.date.range' = '2021/01/01,NOW', 'projection.date.format' = 'yyyy/MM/dd', 'projection.date.interval' = '1', 'projection.date.interval.unit' = 'DAYS', 'storage.location.template' = 's3://DOC-EXAMPLE-BUCKET/AWSLogs/accountID/WAFLogs/${region}/DOC-EXAMPLE-WEBACL/${date}/')
참고

예시 LOCATION 절의 경로 형식은 표준이지만 구현한 AWS WAF 구성에 따라 달라질 수 있습니다. 예를 들어, 다음 예제 AWS WAF 로그 경로는 CloudFront 배포를 위한 것입니다.

s3://DOC-EXAMPLE-BUCKET/AWSLogs/12345678910/WAFLogs/cloudfront/cloudfronyt/2022/08/08/17/55/

AWS WAF 로그 테이블을 만들거나 쿼리하는 동안 문제가 발생하는 경우 로그 데이터 또는 연락처의 위치를 확인하세요. AWS Support

파티션 프로젝션에 대한 자세한 내용은 Amazon Athena를 사용한 파티션 프로젝션 단원을 참조하세요.

파티셔닝을 사용하지 않고 AWS WAF 로그용 테이블 만들기

이 섹션에서는 파티셔닝이나 파티션 프로젝션을 사용하지 않고 AWS WAF 로그용 테이블을 만드는 방법을 설명합니다.

참고

성능 및 비용상의 이유로 쿼리에는 파티셔닝되지 않은 스키마를 사용하지 않는 것이 좋습니다. 자세한 내용은 AWS 빅 데이터 블로그의 Amazon Athena에 대한 10가지 성능 조정 팁을 참조하십시오.

테이블을 생성하려면 AWS WAF

  1. 다음 DDL 문을 복사하여 Athena 콘솔에 붙여 넣습니다. 필요에 따라 필드를 로그 출력과 일치하도록 수정합니다. 로그를 저장하는 것과 일치하도록 Amazon S3 버킷의 LOCATION을 수정합니다.

    이 쿼리는 OpenX JSON SerDe를 사용합니다.

    참고

    SerDe 에서는 각 JSON 문서가 레코드의 필드를 구분하는 줄 종료 문자가 없는 한 줄의 텍스트에 있을 것으로 예상합니다. JSON 텍스트가 매우 인쇄된 형식인 경우 테이블을 만든 후 쿼리를 시도하면 HIVE_CURSOR_ERROR: 행이 유효한 JSON 객체가 아닙니다. 또는 HIVE_CURSOR_ERROR:: 예상치 못한 end-of-input 결과 JsonParseException: OBJECT에 대한 예상 닫기 표시자와 같은 오류 메시지가 표시될 수 있습니다. 자세한 내용은 SerDe OpenX 설명서의 JSON 데이터 파일을 참조하십시오. GitHub

    CREATE EXTERNAL TABLE `waf_logs`( `timestamp` bigint, `formatversion` int, `webaclid` string, `terminatingruleid` string, `terminatingruletype` string, `action` string, `terminatingrulematchdetails` array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, `httpsourcename` string, `httpsourceid` string, `rulegrouplist` array < struct < rulegroupid: string, terminatingrule: struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > > >, nonterminatingmatchingrules: array < struct < ruleid: string, action: string, overriddenaction: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, challengeresponse: struct < responsecode: string, solvetimestamp: string >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, excludedrules: string > >, `ratebasedrulelist` array < struct < ratebasedruleid: string, limitkey: string, maxrateallowed: int > >, `nonterminatingmatchingrules` array < struct < ruleid: string, action: string, rulematchdetails: array < struct < conditiontype: string, sensitivitylevel: string, location: string, matcheddata: array < string > > >, challengeresponse: struct < responsecode: string, solvetimestamp: string >, captcharesponse: struct < responsecode: string, solvetimestamp: string > > >, `requestheadersinserted` array < struct < name: string, value: string > >, `responsecodesent` string, `httprequest` struct < clientip: string, country: string, headers: array < struct < name: string, value: string > >, uri: string, args: string, httpversion: string, httpmethod: string, requestid: string >, `labels` array < struct < name: string > >, `captcharesponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `challengeresponse` struct < responsecode: string, solvetimestamp: string, failureReason: string >, `ja3Fingerprint` string ) ROW FORMAT SERDE 'org.openx.data.jsonserde.JsonSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.mapred.TextInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/'
  2. Athena 콘솔 쿼리 편집기에서 CREATE EXTERNAL TABLE 문을 실행합니다. 이렇게 하면 waf_logs 테이블이 등록되고 Athena에서 그 안의 데이터를 쿼리할 수 있습니다.

로그 쿼리 예제 AWS WAF

다음과 같은 다수의 쿼리 예제가 이 문서에서 이전에 생성한 파티션 프로젝션 테이블을 사용합니다. 필요에 따라 예제의 테이블 이름, 열 값 및 기타 변수를 수정합니다. 쿼리 성능을 개선하고 비용을 줄이려면 필터 조건에 파티션 열을 추가합니다.

 

 

 

- 지정된 용어를 포함하는 참조자 수 계산

다음 쿼리는 지정된 날짜 범위에서 “amazon”이라는 용어를 포함한 참조자의 수를 셉니다.

WITH test_dataset AS (SELECT header FROM waf_logs CROSS JOIN UNNEST(httprequest.headers) AS t(header) WHERE "date" >= '2021/03/01' AND "date" < '2021/03/31') SELECT COUNT(*) referer_count FROM test_dataset WHERE LOWER(header.name)='referer' AND header.value LIKE '%amazon%'
- 지난 10일 동안 제외된 규칙과 일치하는 모든 IP 주소 계산

다음 쿼리는 지난 10일 동안 IP 주소가 규칙 그룹에서 제외된 규칙과 일치하는 횟수를 셉니다.

WITH test_dataset AS (SELECT * FROM waf_logs CROSS JOIN UNNEST(rulegrouplist) AS t(allrulegroups)) SELECT COUNT(*) AS count, "httprequest"."clientip", "allrulegroups"."excludedrules", "allrulegroups"."ruleGroupId" FROM test_dataset WHERE allrulegroups.excludedrules IS NOT NULL AND from_unixtime(timestamp/1000) > now() - interval '10' day GROUP BY "httprequest"."clientip", "allrulegroups"."ruleGroupId", "allrulegroups"."excludedrules" ORDER BY count DESC
- 일치하는 횟수를 기준으로 계산된 모든 관리형 규칙 그룹화

2022년 10월 27일 이전에 웹 ACL 구성에서 규칙 그룹 규칙 작업을 카운트로 설정한 경우, 재정의를 웹 ACL JSON에 다음과 같이 AWS WAF 저장했습니다. excludedRules 이제 규칙을 개수로 재정의하기 위한 JSON 설정은 ruleActionOverrides 설정에 있습니다. 자세한 내용은 AWS WAF 개발자 안내서Action overrides in rule groups를 참조하세요. 새 로그 구조에서 개수 모드의 관리형 규칙을 추출하려면 다음 예제와 같이 excludedRules 필드 대신 ruleGroupList 섹션에서 nonTerminatingMatchingRules를 쿼리합니다.

SELECT count(*) AS count, httpsourceid, httprequest.clientip, t.rulegroupid, t.nonTerminatingMatchingRules FROM "waf_logs" CROSS JOIN UNNEST(rulegrouplist) AS t(t) WHERE action <> 'BLOCK' AND cardinality(t.nonTerminatingMatchingRules) > 0 GROUP BY t.nonTerminatingMatchingRules, action, httpsourceid, httprequest.clientip, t.rulegroupid ORDER BY "count" DESC Limit 50
- 일치하는 횟수를 기준으로 계산된 모든 사용자 지정 규칙 그룹화

다음 쿼리는 일치하는 횟수를 기준으로 계산된 모든 사용자 지정 규칙을 그룹화합니다.

SELECT count(*) AS count, httpsourceid, httprequest.clientip, t.ruleid, t.action FROM "waf_logs" CROSS JOIN UNNEST(nonterminatingmatchingrules) AS t(t) WHERE action <> 'BLOCK' AND cardinality(nonTerminatingMatchingRules) > 0 GROUP BY t.ruleid, t.action, httpsourceid, httprequest.clientip ORDER BY "count" DESC Limit 50

사용자 지정 규칙 및 관리형 규칙 그룹의 로그 위치에 대한 자세한 내용은 AWS WAF 개발자 안내서Monitoring and tuning을 참조하세요.

날짜 및 시간 작업

- 사람이 읽을 수 있는 ISO 8601 형식으로 타임스탬프 필드 반환

다음 쿼리는 from_unixtimeto_iso8601 함수를 사용하여 timestamp 필드를 사람이 읽을 수 있는 ISO 8601 형식으로 반환합니다(예: 1576280412771이 아닌 2019-12-13T23:40:12.000Z). 쿼리는 HTTP 소스 이름, 소스 ID, 요청도 반환합니다.

SELECT to_iso8601(from_unixtime(timestamp / 1000)) as time_ISO_8601, httpsourcename, httpsourceid, httprequest FROM waf_logs LIMIT 10;
- 최근 24시간의 레코드 반환

다음 쿼리는 WHERE 절에 필터를 사용해 최근 24시간의 레코드의 HTTP 소스 이름, HTTP 소스 ID, HTTP 요청 필드를 반환합니다.

SELECT to_iso8601(from_unixtime(timestamp/1000)) AS time_ISO_8601, httpsourcename, httpsourceid, httprequest FROM waf_logs WHERE from_unixtime(timestamp/1000) > now() - interval '1' day LIMIT 10;
- 지정된 날짜 범위 및 IP 주소에 대한 레코드 반환

다음 쿼리는 지정된 클라이언트 IP 주소에 대해 지정된 날짜 범위의 레코드를 나열합니다.

SELECT * FROM waf_logs WHERE httprequest.clientip='53.21.198.66' AND "date" >= '2021/03/01' AND "date" < '2021/03/31'
- 지정된 날짜 범위에 대해 5분 간격으로 IP 주소 개수 계산

다음 쿼리는 특정 날짜 범위에 대해 5분 간격으로 IP 주소 개수를 셉니다.

WITH test_dataset AS (SELECT format_datetime(from_unixtime((timestamp/1000) - ((minute(from_unixtime(timestamp / 1000))%5) * 60)),'yyyy-MM-dd HH:mm') AS five_minutes_ts, "httprequest"."clientip" FROM waf_logs WHERE "date" >= '2021/03/01' AND "date" < '2021/03/31') SELECT five_minutes_ts,"clientip",count(*) ip_count FROM test_dataset GROUP BY five_minutes_ts,"clientip"
- 지난 10일 동안 X-Forwarded-For IP 수 계산

다음 쿼리는 요청 헤더를 필터링하고 지난 10일 동안의 X-Forwarded-For IP 수를 계산합니다.

WITH test_dataset AS (SELECT header FROM waf_logs CROSS JOIN UNNEST (httprequest.headers) AS t(header) WHERE from_unixtime("timestamp"/1000) > now() - interval '10' DAY) SELECT header.value AS ip, count(*) AS COUNT FROM test_dataset WHERE header.name='X-Forwarded-For' GROUP BY header.value ORDER BY COUNT DESC

날짜 및 시간 함수에 대한 자세한 내용은 Trino 설명서의 날짜 및 시간 함수와 연산자를 참조하세요.

차단된 요청 및 주소 작업

- 지정된 규칙 유형에 의해 차단된 상위 100개 IP 주소 추출

다음 쿼리는 지정된 날짜 범위 동안 RATE_BASED 종료 규칙에 의해 차단된 상위 100개 IP 주소를 추출하고 그 횟수를 셉니다.

SELECT COUNT(httpRequest.clientIp) as count, httpRequest.clientIp FROM waf_logs WHERE terminatingruletype='RATE_BASED' AND action='BLOCK' and "date" >= '2021/03/01' AND "date" < '2021/03/31' GROUP BY httpRequest.clientIp ORDER BY count DESC LIMIT 100
- 지정된 국가의 요청이 차단된 횟수 계산

다음 쿼리는 아일랜드(IE)에 속하며 RATE_BASED 종료 규칙에 의해 차단된 IP 주소에서 요청이 도착한 횟수를 계산합니다.

SELECT COUNT(httpRequest.country) as count, httpRequest.country FROM waf_logs WHERE terminatingruletype='RATE_BASED' AND httpRequest.country='IE' GROUP BY httpRequest.country ORDER BY count LIMIT 100;
- 요청이 차단된 횟수 계산(특정 속성별로 그룹화)

다음 쿼리는 요청이 차단된 횟수를 계산하며, 결과는 WebACL RuleId, 클라이언트 IP 및 HTTP 요청 URI별로 그룹화됩니다.

SELECT COUNT(*) AS count, webaclid, terminatingruleid, httprequest.clientip, httprequest.uri FROM waf_logs WHERE action='BLOCK' GROUP BY webaclid, terminatingruleid, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;
- 특정 종료 규칙 ID가 일치하는 횟수 계산

다음 쿼리는 특정 종료 규칙 ID(WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e')가 일치하는 횟수를 계산합니다. 이 쿼리는 WebACL, Action, ClientIP 및 HTTP 요청 URI별로 결과를 그룹화합니다.

SELECT COUNT(*) AS count, webaclid, action, httprequest.clientip, httprequest.uri FROM waf_logs WHERE terminatingruleid='e9dd190d-7a43-4c06-bcea-409613d9506e' GROUP BY webaclid, action, httprequest.clientip, httprequest.uri ORDER BY count DESC LIMIT 100;
- 지정된 날짜 범위 동안 차단된 상위 100개 IP 주소 검색

다음 쿼리는 지정된 날짜 범위 동안 차단된 상위 100개 IP 주소를 추출합니다. 쿼리에는 IP 주소가 차단된 횟수도 나열됩니다.

SELECT "httprequest"."clientip", "count"(*) "ipcount", "httprequest"."country" FROM waf_logs WHERE "action" = 'BLOCK' and "date" >= '2021/03/01' AND "date" < '2021/03/31' GROUP BY "httprequest"."clientip", "httprequest"."country" ORDER BY "ipcount" DESC limit 100