Amazon VPC 흐름 로그 쿼리 - Amazon Athena

Amazon VPC 흐름 로그 쿼리

Amazon Virtual Private Cloud 흐름 로그는 VPC의 네트워크 인터페이스에서 송수신되는 IP 트래픽에 대한 정보를 수집합니다. 로그를 사용하여 네트워크 트래픽 패턴을 조사하고 VPC 네트워크에서 위협 및 위험을 식별하세요.

Amazon VPC 흐름 로그를 쿼리하려는 경우 다음 두 가지 옵션이 있습니다.

  • Amazon VPC 콘솔 - Amazon VPC 콘솔의 Athena 통합 기능을 사용하여 분할로 Athena 데이터베이스, 작업 그룹 및 흐름 로그 테이블을 생성하는 AWS CloudFormation 템플릿을 생성합니다. 또한 템플릿에서는 VPC 통해 흐르는 트래픽에 대한 인사이트를 얻기 위해 사용할 수 있는 미리 정의된 흐름 로그 쿼리 집합을 생성할 수 있습니다.

    자세한 내용은 Amazon VPC 사용 설명서Amazon Athena를 사용하여 흐름 로그 쿼리를 참조하세요.

  • Amazon Athena 콘솔 - Athena 콘솔에서 직접 테이블과 쿼리를 생성합니다. 자세히 알아보려면 이 페이지를 계속 읽어 보세요.

사용자 지정 VPC 흐름 로그에 대한 테이블 생성 및 쿼리

Athena에서 로그 쿼리를 시작하기 전에 VPC 흐름 로그를 활성화하고 Amazon S3 버킷에 저장하도록 구성합니다. 로그를 생성한 후 몇 분 동안 실행하여 일부 데이터를 수집합니다. 로그는 GZIP 압축 형식으로 생성되어 Athena를 이용해 직접 쿼리할 수 있습니다.

VPC 흐름 로그를 생성하는 경우 흐름 로그에서 반환할 필드와 해당 필드가 나타나는 순서를 지정할 때 사용자 지정 형식을 사용할 수 있습니다. 흐롬 로그 레코드에 대한 자세한 내용은 Amazon VPC 사용 설명서흐름 로그 레코드를 참조하세요.

일반적인 고려 사항

Athena에서 Amazon VPC 흐름 로그용 테이블을 생성할 때 다음 사항을 기억하세요.

  • 기본적으로 Athena에서 Parquet은 이름으로 열에 액세스합니다. 자세한 내용은 스키마 업데이트 처리 단원을 참조하십시오.

  • Athena의 열 이름에 대한 흐름 로그 레코드의 이름을 사용합니다. Athena 스키마의 열 이름은 Amazon VPC 흐름 로그의 필드 이름과 정확히 일치해야 하며 다음과 같은 차이점이 있습니다.

    • Amazon VPC 로그 필드 이름의 하이픈을 Athena 열 이름의 밑줄로 바꿉니다. Athena에서 데이터베이스 이름, 테이블 이름 및 열 이름에 사용할 수 있는 유일한 문자는 소문자, 숫자 및 밑줄 문자입니다. 자세한 내용은 데이터베이스, 테이블 및 열 이름 단원을 참조하십시오.

    • Athena의 예약 키워드인 흐름 로그 레코드 이름을 백틱(backtick)으로 둘러싸서 이스케이프합니다.

  • VPC 흐름 로그는 AWS 계정에 특정적입니다. 로그 파일을 Amazon S3에 게시할 때 Amazon VPC가 Amazon S3에 생성하는 경로에는 흐름 로그를 생성하는 데 사용된 AWS 계정 ID가 포함됩니다. 자세한 내용은 Amazon VPC 사용 설명서Amazon S3에 흐름 로그 게시를 참조하세요.

Amazon VPC 흐름 로그에 대한 CREATE TABLE 문

다음은 VPC 흐름 로그에 대한 Amazon VPC 테이블을 생성하는 절차입니다. 사용자 지정 형식을 사용하여 흐름 로그를 생성하는 경우 흐름 로그를 생성할 때 지정한 필드와 일치하는 필드가 해당 필드를 지정할 때와 동일한 순서로 포함되는 테이블을 생성합니다.

Amazon VPC 흐름 로그에 대한 Athena 테이블 생성
  1. 일반적인 고려 사항 섹션의 다음 지침에 따라 Athena 콘솔 쿼리 편집기에 다음과 같은 DDL 문을 입력합니다. 다음 문 샘플은 흐름 로그 레코드에 설명된 대로 Amazon VPC 흐름 로그 버전 2 ~ 5의 열을 갖는 테이블을 생성합니다. 다른 열 집합이나 열 순서를 사용하는 경우 그에 맞게 문을 수정합니다.

    CREATE EXTERNAL TABLE IF NOT EXISTS `vpc_flow_logs` ( version int, account_id string, interface_id string, srcaddr string, dstaddr string, srcport int, dstport int, protocol bigint, packets bigint, bytes bigint, start bigint, `end` bigint, action string, log_status string, vpc_id string, subnet_id string, instance_id string, tcp_flags int, type string, pkt_srcaddr string, pkt_dstaddr string, region string, az_id string, sublocation_type string, sublocation_id string, pkt_src_aws_service string, pkt_dst_aws_service string, flow_direction string, traffic_path int ) PARTITIONED BY (`date` date) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/{account_id}/vpcflowlogs/{region_code}/' TBLPROPERTIES ("skip.header.line.count"="1");

    다음 사항에 주의하세요.

    • 이 쿼리는 ROW FORMAT DELIMITED를 지정하며 SerDe 지정은 생략합니다. 즉, 쿼리는 CSV, TSV, 사용자 지정 구분 기호로 구분된 파일에 대한 LazySimpleSerDe을 사용합니다. 이 쿼리에서 필드는 공백으로 끝납니다.

    • PARTITIONED BY 절은 date 유형을 사용합니다. 따라서 쿼리에서 수학 연산자를 사용하여 특정 날짜보다 오래되었거나 최신인 항목을 선택할 수 있습니다.

      참고

      date는 DDL 문에 예약된 키워드이므로 백틱 문자로 이스케이프됩니다. 자세한 내용은 예약어 단원을 참조하십시오.

    • 다른 사용자 지정 형식을 사용하는 VPC 흐름 로그의 경우 흐름 로그를 생성할 때 지정한 필드와 일치하도록 필드를 수정합니다.

  2. 로그 데이터가 포함된 Amazon S3 버킷을 가리키도록 LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/{account_id}/vpcflowlogs/{region_code}/'을 수정합니다.

  3. Athena 콘솔에서 쿼리를 실행합니다. 쿼리가 완료된 후 Athena는 vpc_flow_logs 테이블을 등록하여 쿼리를 실행할 수 있도록 데이터를 준비합니다.

  4. 다음 샘플 쿼리와 같이 데이터를 읽을 수 있도록 파티션을 생성합니다. 이 쿼리는 지정한 날짜에 대한 단일 파티션을 생성합니다. 필요하다면 날짜와 위치의 자리 표시자를 교체하세요.

    참고

    이 쿼리는 사용자가 지정한 날짜에 대한 단일 파티션만 생성합니다. 프로세스를 자동화하려면 이 쿼리를 실행하고 year/month/day에 대해 이 방법으로 파티션을 생성하는 스크립트를 사용하거나 파티션 프로젝션을 지정하는 CREATE TABLE 문을 사용합니다.

    ALTER TABLE vpc_flow_logs ADD PARTITION (`date`='YYYY-MM-dd') LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/{account_id}/vpcflowlogs/{region_code}/YYYY/MM/dd';

vpc_flow_logs 테이블에 대한 쿼리 예

Athena 콘솔의 쿼리 편집기를 사용하여 생성한 테이블에서 SQL 문을 실행합니다. 쿼리를 저장하거나 이전 쿼리를 보거나 쿼리 결과를 CSV 형식으로 다운로드할 수 있습니다. 다음 예에서는 vpc_flow_logs를 테이블의 이름으로 바꿉니다. 사용자의 요구 사항에 따라 열 값과 기타 변수를 수정합니다.

다음 예제 쿼리는 지정된 날짜에 대해 최대 100개의 흐름 로그를 나열합니다.

SELECT * FROM vpc_flow_logs WHERE date = DATE('2020-05-04') LIMIT 100;

다음 쿼리는 거절된 모든 TCP 연결을 나열하며 새로 생성한 날짜 파티션 열인 date를 이용해 이러한 이벤트가 발생한 요일을 추출합니다.

SELECT day_of_week(date) AS day, date, interface_id, srcaddr, action, protocol FROM vpc_flow_logs WHERE action = 'REJECT' AND protocol = 6 LIMIT 100;

어떤 서버가 가장 많은 수의 HTTPS 요청을 수신하는지 알아보려면 다음 쿼리를 사용합니다. 이 쿼리는 지난 주부터 HTTPS 포트 443에서 수신한 패킷 수를 계산하고, 대상 IP 주소별로 그룹화한 다음, 상위 10개를 반환합니다.

SELECT SUM(packets) AS packetcount, dstaddr FROM vpc_flow_logs WHERE dstport = 443 AND date > current_date - interval '7' day GROUP BY dstaddr ORDER BY packetcount DESC LIMIT 10;

Apache Parquet 형식으로 흐름 로그에 대한 테이블 생성

다음은 Apache Parquet 형식으로 Amazon VPC 흐름 로그에 대한 Amazon VPC 테이블을 생성하는 절차입니다.

Parquet 형식으로 Amazon VPC 흐름 로그에 대한 Athena 테이블 생성
  1. 일반적인 고려 사항 섹션의 다음 지침에 따라 Athena 콘솔 쿼리 편집기에 다음과 같은 DDL 문을 입력합니다. 다음 샘플 문은 흐름 로그 레코드에 설명된 대로 Parquet 형식으로 Amazon VPC 흐름 로그 버전 2 ~ 5의 열을 포함하는 테이블을 만들며 Hive는 매시간 분할됩니다. 시간당 파티션이 없는 경우 hour 절에서 PARTITIONED BY를 제거합니다.

    CREATE EXTERNAL TABLE IF NOT EXISTS vpc_flow_logs_parquet ( version int, account_id string, interface_id string, srcaddr string, dstaddr string, srcport int, dstport int, protocol bigint, packets bigint, bytes bigint, start bigint, `end` bigint, action string, log_status string, vpc_id string, subnet_id string, instance_id string, tcp_flags int, type string, pkt_srcaddr string, pkt_dstaddr string, region string, az_id string, sublocation_type string, sublocation_id string, pkt_src_aws_service string, pkt_dst_aws_service string, flow_direction string, traffic_path int ) PARTITIONED BY ( `aws-account-id` string, `aws-service` string, `aws-region` string, `year` string, `month` string, `day` string, `hour` string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/' TBLPROPERTIES ( 'EXTERNAL'='true', 'skip.header.line.count'='1' )
  2. 로그 데이터가 포함된 Amazon S3 경로를 가리키도록 샘플 LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/'을 수정합니다.

  3. Athena 콘솔에서 쿼리를 실행합니다.

  4. 데이터가 Hive 호환 형식인 경우 Athena 콘솔에서 다음 명령을 실행하여 메타스토어의 Hive 파티션을 업데이트 및 로드합니다. 쿼리가 완료되면 vpc_flow_logs_parquet 테이블에서 데이터를 쿼리할 수 있습니다.

    MSCK REPAIR TABLE vpc_flow_logs_parquet

    Hive 호환 데이터를 사용하지 않는 경우 ALTER TABLE ADD PARTITION을 실행하여 파티션을 로드합니다.

Athena를 사용하여 Parquet 형식으로 Amazon VPC 흐름 로그를 쿼리하는 방법에 대한 자세한 내용은 AWS Big Data BlogOptimize performance and reduce costs for network analytics with VPC Flow Logs in Apache Parquet format을 참조하세요.

파티션 프로젝션을 사용하여 Amazon VPC 흐름 로그에 대한 테이블 생성 및 쿼리

다음과 같이 CREATE TABLE을 사용하여 테이블을 생성하고, 테이블을 분할하고, 파티션 프로젝션을 사용하여 자동으로 파티션을 채웁니다. 예에서 테이블 이름 test_table_vpclogs를 테이블 이름으로 바꿉니다. LOCATION 절을 편집하여 Amazon VPC 로그 데이터가 포함된 Amazon S3 버킷을 지정합니다.

다음 CREATE TABLE 문은 Hive 스타일이 아닌 분할 형식으로 전달되는 VPC 흐름 로그입니다. 이 예제에서는 다중 계정 집계를 허용합니다. 여러 계정의 VPC 흐름 로그를 하나의 Amazon S3 버킷으로 중앙 집중화하려는 경우 Amazon S3 경로에 계정 ID를 입력해야 합니다.

CREATE EXTERNAL TABLE IF NOT EXISTS test_table_vpclogs ( version int, account_id string, interface_id string, srcaddr string, dstaddr string, srcport int, dstport int, protocol bigint, packets bigint, bytes bigint, start bigint, `end` bigint, action string, log_status string, vpc_id string, subnet_id string, instance_id string, tcp_flags int, type string, pkt_srcaddr string, pkt_dstaddr string, az_id string, sublocation_type string, sublocation_id string, pkt_src_aws_service string, pkt_dst_aws_service string, flow_direction string, traffic_path int ) PARTITIONED BY (accid string, region string, day string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LOCATION '$LOCATION_OF_LOGS' TBLPROPERTIES ( "skip.header.line.count"="1", "projection.enabled" = "true", "projection.accid.type" = "enum", "projection.accid.values" = "$ACCID_1,$ACCID_2", "projection.region.type" = "enum", "projection.region.values" = "$REGION_1,$REGION_2,$REGION_3", "projection.day.type" = "date", "projection.day.range" = "$START_RANGE,NOW", "projection.day.format" = "yyyy/MM/dd", "storage.location.template" = "s3://DOC-EXAMPLE-BUCKET/AWSLogs/${accid}/vpcflowlogs/${region}/${day}" )

test_table_vpclogs에 대한 쿼리 예

다음 예제 쿼리는 선행하는 CREATE TABLE 문에 의해 생성된 test_table_vpclogs를 쿼리합니다. 쿼리에서 test_table_vpclogs를 자체 테이블 이름으로 바꿉니다. 사용자의 요구 사항에 따라 열 값과 기타 변수를 수정합니다.

지정된 기간 동안 처음 100개의 액세스 로그 항목을 시간순으로 반환하려면 다음과 같은 쿼리를 실행합니다.

SELECT * FROM test_table_vpclogs WHERE day >= '2021/02/01' AND day < '2021/02/28' ORDER BY day ASC LIMIT 100

지정된 기간 동안 상위 10개 HTTP 패킷을 수신하는 서버를 보려면 다음과 같은 쿼리를 실행합니다. 이 쿼리는 지난 주부터 HTTPS 포트 443에서 수신한 패킷 수를 계산하고, 대상 IP 주소별로 그룹화한 다음, 상위 10개를 반환합니다.

SELECT SUM(packets) AS packetcount, dstaddr FROM test_table_vpclogs WHERE dstport = 443 AND day >= '2021/03/01' AND day < '2021/03/31' GROUP BY dstaddr ORDER BY packetcount DESC LIMIT 10

지정된 기간 동안 생성된 로그를 반환하려면 다음과 같은 쿼리를 실행합니다.

SELECT interface_id, srcaddr, action, protocol, to_iso8601(from_unixtime(start)) AS start_time, to_iso8601(from_unixtime("end")) AS end_time FROM test_table_vpclogs WHERE DAY >= '2021/04/01' AND DAY < '2021/04/30'

지정된 기간 사이에 소스 IP 주소에 대한 액세스 로그를 반환하려면 다음과 같은 쿼리를 실행합니다.

SELECT * FROM test_table_vpclogs WHERE srcaddr = '10.117.1.22' AND day >= '2021/02/01' AND day < '2021/02/28'

거부된 TCP 연결을 나열하려면 다음과 같은 쿼리를 실행합니다.

SELECT day, interface_id, srcaddr, action, protocol FROM test_table_vpclogs WHERE action = 'REJECT' AND protocol = 6 AND day >= '2021/02/01' AND day < '2021/02/28' LIMIT 10

10.117로 시작하는 IP 주소 범위에 대한 액세스 로그를 반환하려면 다음과 같은 쿼리를 실행합니다.

SELECT * FROM test_table_vpclogs WHERE split_part(srcaddr,'.', 1)='10' AND split_part(srcaddr,'.', 2) ='117'

지정된 기간 사이에 대상 IP 주소에 대한 액세스 로그를 반환하려면 다음과 같은 쿼리를 실행합니다.

SELECT * FROM test_table_vpclogs WHERE dstaddr = '10.0.1.14' AND day >= '2021/01/01' AND day < '2021/01/31'

파티션 프로젝션을 사용하여 Apache Parquet 형식으로 흐름 로그에 대한 테이블 생성

VPC 흐름 로그에 대한 다음 파티션 프로젝션 CREATE TABLE 문은 Apache Parquet 형식이고 Hive와 호환되지 않으며 일 대신 시간 및 날짜를 기준으로 파티셔닝됩니다. 예에서 테이블 이름 test_table_vpclogs_parquet를 테이블 이름으로 바꿉니다. LOCATION 절을 편집하여 Amazon VPC 로그 데이터가 포함된 Amazon S3 버킷을 지정합니다.

CREATE EXTERNAL TABLE IF NOT EXISTS test_table_vpclogs_parquet ( version int, account_id string, interface_id string, srcaddr string, dstaddr string, srcport int, dstport int, protocol bigint, packets bigint, bytes bigint, start bigint, `end` bigint, action string, log_status string, vpc_id string, subnet_id string, instance_id string, tcp_flags int, type string, pkt_srcaddr string, pkt_dstaddr string, az_id string, sublocation_type string, sublocation_id string, pkt_src_aws_service string, pkt_dst_aws_service string, flow_direction string, traffic_path int ) PARTITIONED BY (region string, date string, hour string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/{account_id}/vpcflowlogs/' TBLPROPERTIES ( "EXTERNAL"="true", "skip.header.line.count" = "1", "projection.enabled" = "true", "projection.region.type" = "enum", "projection.region.values" = "us-east-1,us-west-2,ap-south-1,eu-west-1", "projection.date.type" = "date", "projection.date.range" = "2021/01/01,NOW", "projection.date.format" = "yyyy/MM/dd", "projection.hour.type" = "integer", "projection.hour.range" = "00,23", "projection.hour.digits" = "2", "storage.location.template" = "s3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/${account_id}/vpcflowlogs/${region}/${date}/${hour}" )

추가적인 리소스

Athena를 사용하여 VPC 흐름 로그를 분석하는 방법에 대한 자세한 내용은 다음 AWS 빅 데이터 블로그 게시물을 참조하세요.