Amazon VPC フローログのクエリ - Amazon Athena

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon VPC フローログのクエリ

Amazon Virtual Private Cloud フローログは、VPC 内のネットワークインターフェイス間で送受信される IP トラフィックに関する情報をキャプチャします。このログを使用してネットワークトラフィックのパターンを調査し、VPC ネットワーク全体の脅威やリスクを特定します。

Amazon VPC フローログのクエリを実行するには、2 つの方法があります。

  • Amazon VPC コンソール – Amazon VPC コンソールの Athena 統合機能を使用して、パーティショニング付きの Athena データベース、ワークグループ、およびフローログテーブルを作成する AWS CloudFormation テンプレートを生成します。テンプレートでは、事前定義されたフローログのクエリのセットも作成します。これは、VPC を通過するトラフィックに関するインサイトの取得に使用できます。

    このアプローチの詳細については、「Amazon VPC ユーザーガイド」の「Amazon Athena を使用したフローログのクエリ」を参照してください。

  • Amazon Athena コンソール — Athena コンソールでテーブルやクエリを直接作成します。詳細については、引き続きこのページを参照してください。

カスタム VPC フローログのテーブルの作成とクエリ

Athena でログのクエリを開始する前に、VPC フローログを有効化し、それらが Amazon S3 バケットに保存されるように設定します。ログを作成したら、それを数分間実行していくらかのデータを収集します。ログは、Athena で直接クエリできる GZIP 圧縮形式で作成されます。

VPC フローログの作成時に、フローログで返すフィールドおよびフィールドが表示される順序を指定する際は、カスタム形式を使用できます。フローログレコード詳細については、「Amazon VPC ユーザーガイド」の「フローログレコード」を参照してください。

一般的な考慮事項

Athena で Amazon VPC フローログのテーブルを作成する際は、次の点に注意してください。

  • デフォルトでは、Parquet は名前で列にアクセスします。詳細については、「スキーマ更新の処理」を参照してください。

  • Athena の列名には、フローログレコードの名前を使用します。Athena スキーマの列名は、Amazon VPC フローログのフィールド名と完全に一致する必要があります。ただし、これらには次の違いがあります。

    • Amazon VPC のログフィールド名のハイフンは、Athena の列名ではアンダースコアに置き換えられます。Athena では、データベース名、テーブル名、および列名に、小文字、数字、アンダースコア文字のみを使用できます。詳細については、「データベース、テーブル、および列の名前」を参照してください。

    • バックティックで囲むことで、Athena で予約されたキーワードであるフローログのレコード名をエスケープします。

  • VPC フローログは AWS アカウント 固有です。ログファイルを Amazon S3 に発行すると、Amazon VPC が Amazon S3 で作成するパスには、フローログの作成に使用された AWS アカウント の ID が含まれます。詳細については、「Amazon VPC ユーザーガイド」の「フローログを Amazon S3 に発行する」を参照してください。

Amazon VPC フローログの CREATE TABLE ステートメント

次の手順では、Amazon VPC フローログ用の Amazon VPC テーブルを作成します。カスタム形式でフローログを作成する場合は、フローログの作成時に指定したフィールドと一致するフィールドを、それらに指定したものと同じ順序で使用してテーブルを作成します。

Amazon VPC フローログ用の Athena テーブルを作成するには
  1. 一般的な考慮事項 セクションのガイドラインに従って、次のような DDL ステートメントを Athena コンソールクエリエディタに入力します。サンプルステートメントは、「フローログレコード」に記載されているように、Amazon VPC フローログのバージョン 2 から 5 の列を持つテーブルを作成します。異なる列のセットまたは列の順序を使用する場合は、必要に応じてステートメントを変更します。

    CREATE EXTERNAL TABLE IF NOT EXISTS `vpc_flow_logs` ( version int, account_id string, interface_id string, srcaddr string, dstaddr string, srcport int, dstport int, protocol bigint, packets bigint, bytes bigint, start bigint, `end` bigint, action string, log_status string, vpc_id string, subnet_id string, instance_id string, tcp_flags int, type string, pkt_srcaddr string, pkt_dstaddr string, region string, az_id string, sublocation_type string, sublocation_id string, pkt_src_aws_service string, pkt_dst_aws_service string, flow_direction string, traffic_path int ) PARTITIONED BY (`date` date) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/{account_id}/vpcflowlogs/{region_code}/' TBLPROPERTIES ("skip.header.line.count"="1");

    以下の点に注意してください。

    • クエリは を指定ROW FORMAT DELIMITEDし、 の指定を省略します SerDe。これは、クエリが CSV、TSV、およびカスタム区切りファイルの LazySimpleSerDe を使用しているということです。このクエリでは、フィールドはスペースで終了します。

    • PARTITIONED BY 句は、date 型を使用します。これにより、クエリで数学演算子を使用して、特定の日付より古いものまたは新しいものを選択できます。

      注記

      date は DDL ステートメントの予約済みキーワードであるため、バックティック文字でエスケープされます。詳細については、「予約済みキーワード」を参照してください。

    • 異なるカスタム形式の VPC フローログの場合、フローログの作成時に指定したフィールドと一致するようにフィールドを変更します。

  2. ログデータが含まれる Amazon S3 バケットをポイントするように LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/{account_id}/vpcflowlogs/{region_code}/' を変更します。

  3. Athena コンソールでクエリを実行します。クエリが完了すると、Athena が vpc_flow_logs テーブルを登録し、その中のデータに対してクエリを発行できるように準備します。

  4. 次のサンプルクエリのように、パーティションを作成してデータを読み取れるようにします。このクエリは、指定日の 1 つのパーティションを作成します。必要に応じて、日付と場所のプレースホルダーを置き換えます。

    注記

    このクエリは、指定日に対して単一のパーティションのみを作成します。プロセスを自動化するには、このクエリを実行し、この方法で year/month/day にパーティションを作成するスクリプトを使用するか、パーティション射影を指定する CREATE TABLE ステートメントを使用します。

    ALTER TABLE vpc_flow_logs ADD PARTITION (`date`='YYYY-MM-dd') LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/{account_id}/vpcflowlogs/{region_code}/YYYY/MM/dd';

vpc_flow_logs テーブルのクエリ例

Athena コンソールのクエリエディタを使用して、作成したテーブルで SQL ステートメントを実行します。クエリを保存したり、以前のクエリを表示したり、クエリ結果を CSV 形式でダウンロードできます。次の例では、vpc_flow_logs をテーブルの名前に置き換えます。また、要件に応じて列の値やその他の変数を変更します。

次のクエリ例では、指定された日付に対して最大 100 のフローログを一覧表示します。

SELECT * FROM vpc_flow_logs WHERE date = DATE('2020-05-04') LIMIT 100;

次のクエリは、すべての拒否された TCP 接続を一覧表示します。新しく作成した日付パーティション列 date を使用して、該当イベントが発生した週の曜日を抽出します。

SELECT day_of_week(date) AS day, date, interface_id, srcaddr, action, protocol FROM vpc_flow_logs WHERE action = 'REJECT' AND protocol = 6 LIMIT 100;

最大数の HTTPS リクエストを受信しているサーバーを確認するには、次のクエリを使用します。HTTPS ポート 443 で受信したパケット数をカウントし、送信先 IP アドレス別にグループ分けして、過去 1 週間の上位 10 のサーバーを返します。

SELECT SUM(packets) AS packetcount, dstaddr FROM vpc_flow_logs WHERE dstport = 443 AND date > current_date - interval '7' day GROUP BY dstaddr ORDER BY packetcount DESC LIMIT 10;

Apache Parquet 形式でフローログのテーブルを作成する

以下の手順では、Apache Parquet 形式で Amazon VPC フローログ用の Amazon VPC テーブルを作成します。

Parquet 形式で Amazon VPC フローログ用の Athena テーブルを作成するには
  1. 一般的な考慮事項 セクションのガイドラインに従って、次のような DDL ステートメントを Athena コンソールクエリエディタに入力します。サンプルステートメントは、「フローログレコード」に記載されているように、1 時間ごとに Hive パーティションされる Parquet 形式で Amazon VPC フローログのバージョン 2 から 5 の列を持つテーブルを作成します。1 時間ごとのパーティションがない場合は、PARTITIONED BY 句から hour を削除します。

    CREATE EXTERNAL TABLE IF NOT EXISTS vpc_flow_logs_parquet ( version int, account_id string, interface_id string, srcaddr string, dstaddr string, srcport int, dstport int, protocol bigint, packets bigint, bytes bigint, start bigint, `end` bigint, action string, log_status string, vpc_id string, subnet_id string, instance_id string, tcp_flags int, type string, pkt_srcaddr string, pkt_dstaddr string, region string, az_id string, sublocation_type string, sublocation_id string, pkt_src_aws_service string, pkt_dst_aws_service string, flow_direction string, traffic_path int ) PARTITIONED BY ( `aws-account-id` string, `aws-service` string, `aws-region` string, `year` string, `month` string, `day` string, `hour` string ) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/' TBLPROPERTIES ( 'EXTERNAL'='true', 'skip.header.line.count'='1' )
  2. ログデータが含まれる Amazon S3 パスをポイントするようにサンプル LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/' を変更します。

  3. Athena コンソールでクエリを実行します。

  4. Hive 互換形式のデータの場合、Athena コンソールで次のコマンドを実行して、メタストアの Hive パーティションを更新およびロードします。クエリが完了すると、vpc_flow_logs_parquet テーブルでデータをクエリできます。

    MSCK REPAIR TABLE vpc_flow_logs_parquet

    Hive 互換データを使用していない場合は、ALTER TABLE ADD PARTITION を実行してパーティションをロードします。

Athena を使用して、Parquet 形式の Amazon VPC フローログをクエリする方法については、AWS ビッグデータ ブログの記事「Apache Parquet 形式の VPC フローログを使用して、ネットワーク分析のパフォーマンスを最適化し、そのコストを削減する」を参照してください。

パーティション射影を使用した Amazon VPC フローログのテーブルの作成とクエリ

次のような CREATE TABLE ステートメントを使用してテーブルを作成およびパーティションし、パーティション射影を使用してパーティションを自動的に入力します。サンプルのテーブル名 test_table_vpclogs をそのテーブルの名前に置き換えます。Amazon VPC ログデータが含まれている Amazon S3 バケットを指定するように LOCATION 句を編集します。

次は、非 Hive スタイルのパーティション形式で配信される VPC フローログ用の CREATE TABLE ステートメントです。この例では、マルチアカウント集計が可能です。複数のアカウントからの VPC フローログを 1 つの Amazon S3 バケットに集中管理する場合は、アカウント ID を Amazon S3 パスに入力する必要があります。

CREATE EXTERNAL TABLE IF NOT EXISTS test_table_vpclogs ( version int, account_id string, interface_id string, srcaddr string, dstaddr string, srcport int, dstport int, protocol bigint, packets bigint, bytes bigint, start bigint, `end` bigint, action string, log_status string, vpc_id string, subnet_id string, instance_id string, tcp_flags int, type string, pkt_srcaddr string, pkt_dstaddr string, az_id string, sublocation_type string, sublocation_id string, pkt_src_aws_service string, pkt_dst_aws_service string, flow_direction string, traffic_path int ) PARTITIONED BY (accid string, region string, day string) ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' LOCATION '$LOCATION_OF_LOGS' TBLPROPERTIES ( "skip.header.line.count"="1", "projection.enabled" = "true", "projection.accid.type" = "enum", "projection.accid.values" = "$ACCID_1,$ACCID_2", "projection.region.type" = "enum", "projection.region.values" = "$REGION_1,$REGION_2,$REGION_3", "projection.day.type" = "date", "projection.day.range" = "$START_RANGE,NOW", "projection.day.format" = "yyyy/MM/dd", "storage.location.template" = "s3://$LOCATION_OF_LOGS/AWSLogs/${accid}/vpcflowlogs/${region}/${day}" )

test_table_vpclogs のクエリ例

次のクエリ例では、前述の CREATE TABLE ステートメントで作成された test_table_vpclogs のクエリを実行します。クエリの test_table_vpclogs を独自のテーブルの名前で置き換えます。また、要件に応じて列の値やその他の変数を変更します。

指定された期間の最初の 100 個のアクセスログエントリを時系列順に返すには、次のようなクエリを実行します。

SELECT * FROM test_table_vpclogs WHERE day >= '2021/02/01' AND day < '2021/02/28' ORDER BY day ASC LIMIT 100

指定された期間に上位 10 個の HTTP パケットを受信したサーバーを表示するには、次のようなクエリを実行します。このクエリは、HTTPS ポート 443 で受信したパケット数をカウントし、送信先 IP アドレス別にグループ分けして、過去 1 週間の上位 10 エントリを返します。

SELECT SUM(packets) AS packetcount, dstaddr FROM test_table_vpclogs WHERE dstport = 443 AND day >= '2021/03/01' AND day < '2021/03/31' GROUP BY dstaddr ORDER BY packetcount DESC LIMIT 10

指定された期間内に作成されたログを返すには、次のようなクエリを実行します。

SELECT interface_id, srcaddr, action, protocol, to_iso8601(from_unixtime(start)) AS start_time, to_iso8601(from_unixtime("end")) AS end_time FROM test_table_vpclogs WHERE DAY >= '2021/04/01' AND DAY < '2021/04/30'

指定された期間の送信元 IP アドレスのアクセスログを返すには、次のようなクエリを実行します。

SELECT * FROM test_table_vpclogs WHERE srcaddr = '10.117.1.22' AND day >= '2021/02/01' AND day < '2021/02/28'

すべての拒否された TCP 接続を一覧表示するには、次のようなクエリを実行します。

SELECT day, interface_id, srcaddr, action, protocol FROM test_table_vpclogs WHERE action = 'REJECT' AND protocol = 6 AND day >= '2021/02/01' AND day < '2021/02/28' LIMIT 10

10.117 で始まる IP アドレス範囲のアクセスログを返すには、次のようなクエリを実行します。

SELECT * FROM test_table_vpclogs WHERE split_part(srcaddr,'.', 1)='10' AND split_part(srcaddr,'.', 2) ='117'

特定の時間範囲内の送信先 IP アドレスのアクセスログを返すには、次のようなクエリを実行します。

SELECT * FROM test_table_vpclogs WHERE dstaddr = '10.0.1.14' AND day >= '2021/01/01' AND day < '2021/01/31'

パーティションプロジェクションを使用した Apache Parquet 形式でのフローログのテーブルの作成

次の VPC フローログのパーティションプロジェクション CREATE TABLE ステートメントは Apache Parquet 形式であり、Hive と互換性がなく、曜日ではなく時間と日付でパーティショニングされています。サンプルのテーブル名 test_table_vpclogs_parquet をそのテーブルの名前に置き換えます。Amazon VPC ログデータが含まれている Amazon S3 バケットを指定するように LOCATION 句を編集します。

CREATE EXTERNAL TABLE IF NOT EXISTS test_table_vpclogs_parquet ( version int, account_id string, interface_id string, srcaddr string, dstaddr string, srcport int, dstport int, protocol bigint, packets bigint, bytes bigint, start bigint, `end` bigint, action string, log_status string, vpc_id string, subnet_id string, instance_id string, tcp_flags int, type string, pkt_srcaddr string, pkt_dstaddr string, az_id string, sublocation_type string, sublocation_id string, pkt_src_aws_service string, pkt_dst_aws_service string, flow_direction string, traffic_path int ) PARTITIONED BY (region string, date string, hour string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat' LOCATION 's3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/{account_id}/vpcflowlogs/' TBLPROPERTIES ( "EXTERNAL"="true", "skip.header.line.count" = "1", "projection.enabled" = "true", "projection.region.type" = "enum", "projection.region.values" = "us-east-1,us-west-2,ap-south-1,eu-west-1", "projection.date.type" = "date", "projection.date.range" = "2021/01/01,NOW", "projection.date.format" = "yyyy/MM/dd", "projection.hour.type" = "integer", "projection.hour.range" = "00,23", "projection.hour.digits" = "2", "storage.location.template" = "s3://DOC-EXAMPLE-BUCKET/prefix/AWSLogs/${account_id}/vpcflowlogs/${region}/${date}/${hour}" )

追加リソース

Athena を使用して VPC フローログを分析する方法の詳細については、次の「 AWS ビッグデータ ブログ の投稿」を参照してください。