AWS Glue における ETL 出力のパーティションの管理 - AWS Glue

AWS Glue における ETL 出力のパーティションの管理

パーティション分割は、データセットを整理して効率的にクエリを実行可能にする重要な手法です。1 つまたは複数の列の個別の値に基づいて、データを階層形式のディレクトリ構造に整理します。

たとえば、Amazon Simple Storage Service (Amazon S3) のアプリケーションログを、年、月、日で分類しながら、日付についてパーティション化できます。次に 1 日分のデータに対応するファイルを s3://my_bucket/logs/year=2018/month=01/day=23/ などのプレフィックス別に配置します。Amazon Athena、Amazon Redshift Spectrum、そして AWS Glue などのシステムでは、基になるデータ全体を Amazon S3 から読み取ることなく、これらのパーティションを使用してパーティション値でデータをフィルタリングできます。

クローラは、ファイルタイプとスキーマを推定するだけでなく、AWS Glue Data Catalog を構成する際に、データセットのパーティション構造も自動的に特定します。これにより出力されるパーティション列に対しては、AWS Glue の ETL ジョブやクエリエンジン (Amazon Athena など) からクエリを実行できます。

テーブルのクロールが完了すると、クローラが作成したパーティションを表示できます。AWS Glue コンソールの左のナビゲーションペインで、[Tables] (テーブル) をクリックします。クローラで作成されたテーブルを選択した後、[View Partitions] (パーティション) の表示をクリックします。

Apache Hive 形式のパーティション分割されたパス (key=val 形式) の場合、クローラはキー名を使用して自動的に列名を事前設定します。それ以外の場合は、partition_0partition_1 などのデフォルト名が使用されます。コンソールでデフォルト名を変更するには、テーブルに移動して [Edit Schema (スキーマの編集)] を選択し、パーティション列の名前を変更します。

次に、ETL スクリプトでパーティション列をフィルタリングできます。パーティション情報は Data Catalog に格納されるため、パーティション列を DynamicFrame に含めるには from_catalog API 呼び出しを使用します。たとえば、create_dynamic_frame.from_catalog ではなく create_dynamic_frame.from_options を使用します。

プッシュダウン述語を使用した事前フィルタ処理

多くの場合、プッシュダウン述語を使用してパーティションをフィルタリングできます。データセットのすべてのファイルをリストアップして読み取る必要はありません。データセット全体を読み取って DynamicFrame でフィルタリングする代わりに、Data Catalog 内でパーティションのメタデータに直接フィルターを適用できます。次に、実際に必要なものだけをリストアップして DynamicFrame 内に読み取ることができます。

たとえば、Python では以下のように記述できます。

glue_context.create_dynamic_frame.from_catalog( database = "my_S3_data_set", table_name = "catalog_data_table", push_down_predicate = my_partition_predicate)

これによって作成される DynamicFrame では、Data Catalog のパーティションのうち、述語式を満たすものだけがロードされます。ロードするデータのサブセットを絞り込む度合いに応じて、処理時間を大幅に短縮できる場合があります。

述語式として、Spark SQL でサポートされている任意のブール式を使用できます。Spark SQL クエリで WHERE 句に指定できる条件は、すべて正常に動作します。例えば述語式 pushDownPredicate = "(year=='2017' and month=='04')" では、Data Catalog 内で year が「2017」に等しく、また month が「04」に等しいパーティションのみロードされます。詳細については、Apache Spark SQL のドキュメントを参照してください。特に Scala SQL 関数リファレンスが参考になります。

Hive 形式の Amazon S3 パスのパーティション分割に加えて、Apache Parquet および Apache ORC ファイル形式では、各ファイルが列値を表すデータのブロック単位にパーティション分割されます。各ブロックにも、ブロック内のレコードに関する統計情報 (列の最小/最大値など) が保存されます。AWSGlue は、Hive 形式のパーティションに加え、これらの形式のブロックパーティションの両方でプッシュダウン述語をサポートしています。これにより、Parquet 形式と ORC 形式の不要な Amazon S3 パーティションを取り除き、列統計を使用して不要と判断したブロックをスキップできます。

カタログのパーティション述語を使用したサーバー側のフィルタリング

push_down_predicate オプションは、カタログからすべてのパーティションを一覧表示した後、Amazon S3 にあるそれらのパーティションからファイルをリストする前に適用されます。テーブルに多数のパーティションがある場合、カタログパーティションのリストに、時間的なオーバーヘッドが余計に発生する可能性があります。このオーバーヘッドに対処するには、catalogPartitionPredicate オプションを指定して AWS Glue Data Catalog のパーティションインデックスを使用しながら、サーバー側のパーティションでプルーニングを行います。1 つのテーブルに数百万のパーティションがある場合、これにより、パーティションのフィルタリングを大幅に高速化できます。カタログのパーティションインデックスではまだサポートされていない述語構文が、catalogPartitionPredicate で必要となる場合には、additional_options の中で push_down_predicatecatalogPartitionPredicate の両方を使用することもできます

Python:

dynamic_frame = glueContext.create_dynamic_frame.from_catalog( database=dbname, table_name=tablename, transformation_ctx="datasource0", push_down_predicate="day>=10 and customer_id like '10%'", additional_options={"catalogPartitionPredicate":"year='2021' and month='06'"} )

Scala:

val dynamicFrame = glueContext.getCatalogSource( database = dbname, tableName = tablename, transformationContext = "datasource0", pushDownPredicate="day>=10 and customer_id like '10%'", additionalOptions = JsonOptions("""{ "catalogPartitionPredicate": "year='2021' and month='06'"}""") ).getDynamicFrame()
注記

push_down_predicatecatalogPartitionPredicate では、使用される構文が異なります。前者では Spark SQL の標準構文を使用し、後者では JSQL パーサーを使用します。

パーティションの書き込み

デフォルトでは、DynamicFrame は書き込むときにパーティション分割されません。すべての出力ファイルは、指定した出力パスの最上位レベルに書き込まれます。最近まで、DynamicFrame をパーティションに書き込む唯一の方法は、書き込む前に Spark SQL DataFrame に変換することでした。

ただし、DynamicFrames ではキーのシーケンスを使用したネイティブのパーティション分割がサポートされるようになりました。この場合、シンクの作成時に partitionKeys オプションを使用します。例えば、次の Python コードではデータセットを Parquet 形式で Amazon S3 のディレクトリに書き込みます。これらのディレクトリでは、型フィールドごとにパーティション分割されています。これらのパーティションに対しては、他のシステム (Amazon Athena など) を使用しての処理が行えます。

glue_context.write_dynamic_frame.from_options( frame = projectedEvents, connection_type = "s3", connection_options = {"path": "$outpath", "partitionKeys": ["type"]}, format = "parquet")