AWS Glue での ETL 出力のパーティションの管理 - AWS Glue

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

AWS Glue での ETL 出力のパーティションの管理

パーティション分割は、データセットを整理して効率的にクエリを実行可能にする重要な手法です。1 つまたは複数の列の個別の値に基づいて、データを階層形式のディレクトリ構造に整理します。

たとえば、Amazon Simple Storage Service (Amazon S3) のアプリケーションログを、年、月、日で分類しながら、日付についてパーティション化できます。次に 1 日分のデータに対応するファイルを s3://my_bucket/logs/year=2018/month=01/day=23/ などのプレフィックス別に配置します。Amazon Athena、Amazon Redshift Spectrum、そして AWS Glue などのシステムでは、基になるデータ全体を Amazon S3 から読み取ることなく、これらのパーティションを使用してパーティション値でデータをフィルタリングできます。

クローラは、ファイルタイプとスキーマを推定するだけでなく、AWS Glue Data Catalog を構成する際に、データセットのパーティション構造も自動的に特定します。これにより出力されるパーティション列に対しては、AWS Glue ETL ジョブやクエリエンジン (Amazon Athena など) からクエリを実行できます。

テーブルのクロールが完了すると、クローラが作成したパーティションを表示できます。AWS Glue コンソールの左のナビゲーションペインで、[Tables] (テーブル) をクリックします。クローラで作成されたテーブルを選択した後、[View Partitions] (パーティション) の表示をクリックします。

Apache Hive 形式のパーティション分割されたパス (key=val 形式) の場合、クローラはキー名を使用して自動的に列名を事前設定します。それ以外の場合は、partition_0partition_1 などのデフォルト名が使用されます。コンソールでデフォルトの名前を変更できます。そのためには、テーブルに移動します。[インデックス] タブにインデックスが存在するかどうかを確認します。ある場合は、削除してから続行する必要があります (後で新しい列名を使用して再作成できます)。次に、[スキーマの編集] を選択し、そこでパーティション列の名前を変更します。

次に、ETL スクリプトでパーティション列をフィルタリングできます。パーティション情報は Data Catalog に格納されるため、パーティション列を DynamicFrame に含めるには from_catalog API 呼び出しを使用します。例えば、create_dynamic_frame.from_options ではなく create_dynamic_frame.from_catalog を使用します。

パーティション化は、データスキャンを減らす最適化手法です。この手法が適切であることを特定するプロセスの詳細については、「AWS 規範的ガイダンス」の「Best practices for performance tuning AWS Glue for Apache Spark jobs」ガイドにある「Reduce the amount of data scan」を参照してください。

プッシュダウン述語を使用した事前フィルタ処理

多くの場合、プッシュダウン述語を使用してパーティションをフィルタリングできます。データセットのすべてのファイルをリストアップして読み取る必要はありません。データセット全体を読み取って DynamicFrame でフィルタリングする代わりに、Data Catalog 内でパーティションのメタデータに直接フィルターを適用できます。次に、実際に必要なものだけをリストアップして DynamicFrame 内に読み取ることができます。

たとえば、Python では以下のように記述できます。

glue_context.create_dynamic_frame.from_catalog( database = "my_S3_data_set", table_name = "catalog_data_table", push_down_predicate = my_partition_predicate)

これによって作成される DynamicFrame では、Data Catalog のパーティションのうち、述語式を満たすものだけがロードされます。ロードするデータのサブセットを絞り込む度合いに応じて、処理時間を大幅に短縮できる場合があります。

述語式として、Spark SQL でサポートされている任意のブール式を使用できます。Spark SQL クエリで WHERE 句に指定できる条件は、すべて正常に動作します。例えば述語式 pushDownPredicate = "(year=='2017' and month=='04')" では、Data Catalog 内で year が「2017」に等しく、また month が「04」に等しいパーティションのみロードされます。詳細については、Apache Spark SQL のドキュメントを参照してください。特に Scala SQL 関数リファレンスが参考になります。

カタログのパーティション述語を使用したサーバー側のフィルタリング

push_down_predicate オプションは、カタログからすべてのパーティションを一覧表示した後、Amazon S3 にあるそれらのパーティションからファイルをリストする前に適用されます。テーブルに多数のパーティションがある場合、カタログパーティションのリストに、時間的なオーバーヘッドが余計に発生する可能性があります。このオーバーヘッドに対処するには、catalogPartitionPredicate オプションを指定して AWS Glue Data Catalog のパーティションインデックスを使用しながら、サーバー側のパーティションでプルーニングを行います。1 つのテーブルに数百万のパーティションがある場合、これにより、パーティションのフィルタリングを大幅に高速化できます。カタログのパーティションインデックスではまだサポートされていない述語構文が、catalogPartitionPredicate で必要となる場合には、additional_options の中で push_down_predicatecatalogPartitionPredicate の両方を使用することもできます

Python:

dynamic_frame = glueContext.create_dynamic_frame.from_catalog( database=dbname, table_name=tablename, transformation_ctx="datasource0", push_down_predicate="day>=10 and customer_id like '10%'", additional_options={"catalogPartitionPredicate":"year='2021' and month='06'"} )

Scala:

val dynamicFrame = glueContext.getCatalogSource( database = dbname, tableName = tablename, transformationContext = "datasource0", pushDownPredicate="day>=10 and customer_id like '10%'", additionalOptions = JsonOptions("""{ "catalogPartitionPredicate": "year='2021' and month='06'"}""") ).getDynamicFrame()
注記

push_down_predicatecatalogPartitionPredicate では、使用される構文が異なります。前者では Spark SQL の標準構文を使用し、後者では JSQL パーサーを使用します。

パーティションの書き込み

デフォルトでは、DynamicFrame は書き込むときにパーティション分割されません。すべての出力ファイルは、指定した出力パスの最上位レベルに書き込まれます。最近まで、DynamicFrame をパーティションに書き込む唯一の方法は、書き込む前に Spark SQL DataFrame に変換することでした。

ただし、DynamicFrames ではキーのシーケンスを使用したネイティブのパーティション分割がサポートされるようになりました。この場合、シンクの作成時に partitionKeys オプションを使用します。例えば、次の Python コードではデータセットを Parquet 形式で Amazon S3 のディレクトリに書き込みます。これらのディレクトリでは、型フィールドごとにパーティション分割されています。これらのパーティションに対しては、他のシステム (Amazon Athena など) を使用しての処理が行えます。

glue_context.write_dynamic_frame.from_options( frame = projectedEvents, connection_type = "s3", connection_options = {"path": "$outpath", "partitionKeys": ["type"]}, format = "parquet")