AWS Glue ETL のプッシュダウンによる読み取りの最適化 - AWS Glue

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

AWS Glue ETL のプッシュダウンによる読み取りの最適化

プッシュダウンは、データのソースにより近いデータの取得に関するロジックをプッシュする最適化手法です。ソースは、データベースでも、Amazon S3 などのファイルシステムでもかまいません。特定の操作をソース上で直接実行する場合、ネットワーク上のすべてのデータを AWS Glue が管理する Spark エンジンに送らないようにすることで、時間と処理能力を節約できます。

つまり、プッシュダウンによってデータスキャンが減ります。この手法が適切であることを特定するプロセスの詳細については、「AWS 規範的ガイダンス」の「Best practices for performance tuning AWS Glue for Apache Spark jobs」ガイドにある「Reduce the amount of data scan」を参照してください。

Amazon S3 に保存されているファイルの述語プッシュダウン

プレフィックス別に整理された Amazon S3 上のファイルを操作する場合、プッシュダウン述語を定義することでターゲット Amazon S3 パスをフィルタリングできます。データセット全体を読み込み、DynamicFrame 内のフィルタを適用する代わりに、AWS Glue データカタログに保存されているパーティションメタデータにフィルタを直接適用できます。この方法では、必要なデータだけを選択的に一覧表示して読み込むことができます。パーティションごとのバケットへの書き込みなど、このプロセスの詳細については、「AWS Glue での ETL 出力のパーティションの管理」を参照してください。

Amazon S3 で述語プッシュダウンを実現するには、push_down_predicate パラメータを使用します。Amazon S3 のバケットを年、月、日ごとに分割したとします。2022 年 6 月の顧客データを取得したい場合は、関連する Amazon S3 パスのみを読み込むように AWS Glue に指示できます。この場合の push_down_predicateyear='2022' and month='06' です。まとめると、読み込み操作は以下のようになります。

Python
customer_records = glueContext.create_dynamic_frame.from_catalog( database = "customer_db", table_name = "customer_tbl", push_down_predicate = "year='2022' and month='06'" )
Scala
val customer_records = glueContext.getCatalogSource( database="customer_db", tableName="customer_tbl", pushDownPredicate="year='2022' and month='06'" ).getDynamicFrame()

前のシナリオでは、push_down_predicate は AWS Glue データカタログからすべてのパーティションのリストを取得してフィルタリングしてから、基になる Amazon S3 ファイルを読み込みます。これはほとんどの場合に役立ちますが、数百万のパーティションがあるデータセットを扱う場合、パーティションを一覧表示するプロセスには時間がかかる場合があります。この問題に対処するには、サーバー側でパーティションをプルーニングすることでパフォーマンスを向上させることができます。そのためには、AWS Glue データカタログでデータのパーティションインデックスを作成します。パーティションインデックスについての詳細は、「パーティションインデックスの作成 」を参照してください。その後、catalogPartitionPredicate オプションを使用してインデックスを参照できます。catalogPartitionPredicate を使用してパーティションを取得する例については、「カタログのパーティション述語を使用したサーバー側のフィルタリング」を参照してください。

JDBC ソースを操作するときのプッシュダウン

GlueContext で使用されている AWS Glue JDBC リーダーは、ソース上で直接実行できるカスタム SQL クエリを提供することで、サポートされているデータベースへのプッシュダウンをサポートします。これは sampleQuery パラメータを設定することで実現できます。サンプルクエリでは、選択する列を指定したり、Spark エンジンに転送されるデータを制限するプッシュダウン述語を指定したりできます。

デフォルトでは、サンプルクエリは 1 つのノードで動作するため、大量のデータを処理するとジョブが失敗する可能性があります。この機能を使用してデータを大規模にクエリするには、enablePartitioningForSampleQuery を true に設定してクエリパーティショニングを設定する必要があります。これにより、選択したキーにまたがる複数のノードにクエリが分散されます。クエリパーティショニングには、他にもいくつかの必要な設定パラメータがあります。クエリパーティショニングについての詳細は、「JDBC テーブルからの並列読み取り」を参照してください。

enablePartitioningForSampleQuery を設定すると、AWS Glue はデータベースをクエリするときに、プッシュダウン述語とパーティショニング述語を組み合わせます。パーティショニング条件を追加するには、AWS Glue の sampleQueryAND で終わる必要があります (プッシュダウン述語を指定しない場合は、sampleQueryWHERE で終わる必要があります)。以下の例を参照してください。ここでは、述語をプッシュダウンして id が 1000 を超える行のみを取得しています。この sampleQuery により、id が指定した値より大きい行の名前と場所の列だけが返されます。

Python
sample_query = "select name, location from customer_tbl WHERE id>=1000 AND" customer_records = glueContext.create_dynamic_frame.from_catalog( database="customer_db", table_name="customer_tbl", sample_query = "select name, location from customer_tbl WHERE id>=1000 AND", additional_options = { "hashpartitions": 36 , "hashfield":"id", "enablePartitioningForSampleQuery":True, "sampleQuery":sample_query } )
Scala
val additionalOptions = Map( "hashpartitions" -> "36", "hashfield" -> "id", "enablePartitioningForSampleQuery" -> "true", "sampleQuery" -> "select name, location from customer_tbl WHERE id >= 1000 AND" ) val customer_records = glueContext.getCatalogSource( database="customer_db", tableName="customer_tbl").getDynamicFrame()
注記

customer_tbl が Data Catalog と基になるデータストアで異なる名前がある場合、クエリは基になるデータストアに渡されるため、sample_query に基になるテーブル名を指定する必要があります。

AWS Glue データカタログと統合せずに JDBC テーブルに対してクエリを実行することもできます。ユーザー名とパスワードをパラメータとしてメソッドに提供する代わりに、useConnectionPropertiesconnectionName を指定することで既存の接続の認証情報を再利用できます。この例では、my_postgre_connection という接続から認証情報を取得します。

Python
connection_options_dict = { "useConnectionProperties": True, "connectionName": "my_postgre_connection", "dbtable":"customer_tbl", "sampleQuery":"select name, location from customer_tbl WHERE id>=1000 AND", "enablePartitioningForSampleQuery":True, "hashfield":"id", "hashpartitions":36 } customer_records = glueContext.create_dynamic_frame.from_options( connection_type="postgresql", connection_options=connection_options_dict )
Scala
val connectionOptionsJson = """ { "useConnectionProperties": true, "connectionName": "my_postgre_connection", "dbtable": "customer_tbl", "sampleQuery": "select name, location from customer_tbl WHERE id>=1000 AND", "enablePartitioningForSampleQuery" : true, "hashfield" : "id", "hashpartitions" : 36 } """ val connectionOptions = new JsonOptions(connectionOptionsJson) val dyf = glueContext.getSource("postgresql", connectionOptions).getDynamicFrame()

AWS Glue のプッシュダウンに関する注意事項と制限事項

プッシュダウンは概念上、ストリーミング以外のソースから読み込む場合にも適用できます。AWSGlue はさまざまなソースに対応しています。プッシュダウン機能はソースとコネクタによって異なります。

  • Snowflake に接続するときは、query オプションを使用できます。AWS Glue 4.0 以降のバージョンの Redshift コネクタにも同様の機能があります。Snowflake から query を使用して読み込む方法については、「Snowflake テーブルからの読み込み」を参照してください。

  • DynamoDB ETL リーダーでは、フィルタまたはプッシュダウン述語は使用できません。MongoDB と DocumentDB もこの種の機能をサポートしていません。

  • Amazon S3 に保存されているデータをオープンテーブル形式で読み込む場合、Amazon S3 内のファイルのパーティショニング方法ではもはや十分ではありません。オープンテーブル形式を使用してパーティションから読み込みと書き込みを行うには、その形式のドキュメントを参照してください。

  • DynamicFrame メソッドは Amazon S3 プロジェクションプッシュダウンを実行しません。述語フィルタを通過したファイルからすべての列が読み込まれます。

  • AWS Glue で custom.jdbc コネクタを操作する場合、プッシュダウンできるかどうかはソースとコネクタによって異なります。該当するコネクタのマニュアルを参照して、AWS Glue のプッシュダウンをサポートしているかどうか、またどのようにサポートしているかを確認してください。