ジョブのブックマークを使用した処理済みデータの追跡
AWS Glue ではジョブの実行による状態情報を保持することで、ETL ジョブの以前の実行中にすでに処理されたデータを追跡します。この継続状態の情報はジョブのブックマークと呼ばれています。ジョブのブックマークは、AWS Glue で状態情報を保持して、古いデータを再処理しないために役立ちます。ジョブのブックマークを使用すると、スケジュールされた間隔で再実行する際に新しいデータを処理できます。ジョブのブックマークは、ソース、変換、ターゲットなど、さまざまなジョブの要素で構成されています。例えば、ETL ジョブが Amazon S3 ファイルで新しいパーティションを読み込むとします。AWS Glue は、そのジョブにより正常に処理されたのはどのパーティションなのかを追跡し、処理の重複およびジョブのターゲットデータストアにデータが重複するのを防ぎます。
ジョブのブックマークは、JDBC データソース、Relationalize 変換、および一部の Amazon Simple Storage Service (Amazon S3) ソースに実装されています。次の表に、AWS Glue がジョブのブックマークに対してサポートする Amazon S3 ソース形式を示します。
AWS Glue バージョン | Amazon S3 ソース形式 |
---|---|
バージョン 0.9 | JSON、CSV、Apache Avro、XML |
バージョン 1.0 以降 | JSON、CSV、Apache Avro、XML、Parquet、ORC |
AWS Glue バージョンの詳細に関しては、「Spark ジョブのジョブプロパティの定義」を参照してください。
JDBC ソースでは、次のルールが適用されます。
-
AWS Glue では、テーブルごとに 1 つ以上の列をブックマークキーとして使用して、新しいデータおよび処理済みのデータを決定します。ブックマークキーは、結合して単一の複合キーを形成します。
-
ブックマークキーとして使用する列を指定できます。ブックマークキーを指定しない場合、AWS Glue はデフォルトでプライマリキーをブックマークキーとして使用します。ただし、プライマリキーが連続して (ギャップなく) 増減することが条件です。
-
ユーザー定義のブックマークキーを使用する場合、そのキーは厳密に一定間隔で増減する必要があります。ギャップは許容されます。
-
AWS Glue は、ジョブブックマークキーとして大文字と小文字を区別する列の使用をサポートしていません。
AWS Glue でジョブのブックマークを使用する
ジョブのブックマークオプションは、ジョブが開始したときにパラメータとして渡されます。AWS Glue コンソールでジョブのブックマークを設定するためのオプションを次の表に示します。
ジョブのブックマーク | 説明 |
---|---|
有効 | ジョブの実行後に状態を更新させて以前に処理されたデータを追跡します。ジョブのブックマークをサポートしているソースのあるジョブの場合、ジョブは処理されたデータを追跡し、ジョブが実行されると、最後のチェックポイント以降の新しいデータを処理します。 |
[無効] | ジョブのブックマークは使用されず、ジョブは常にデータセット全体を処理します。以前のジョブからの出力の管理は、ユーザーが行います。これがデフォルトです。 |
[Pause] (一時停止) |
最後のブックマークの状態は更新せずに、最後に正常に実行された後の増分データ、または次のサブオプションで識別される範囲内のデータを処理します。以前のジョブからの出力の管理は、ユーザーが行います。次の 2 つのサブオプションがあります。
このオプションが設定されている場合、ジョブのブックマークの状態は更新されません。 サブオプションはオプションですが、使用する場合、両方のサブオプションを提供する必要があります。 |
コマンドラインでジョブに渡されるパラメータ、および特にジョブブックマークの詳細については、「AWS Glue ジョブのパラメータ」を参照してください。
Amazon S3 入力ソースの場合、AWS Glue ジョブのブックマークではオブジェクトの最終更新日時を確認して、どのオブジェクトを再処理する必要があるのかを確認します。入力ソースデータが最後のジョブ実行以降に変更されている場合、ジョブを再度実行すると、ファイルが再度処理されます。
AWS Glue Spark ETL ジョブのジョブブックマークを以前のジョブ実行に巻き戻すことができます。ジョブのブックマークを以前のジョブ実行に巻き戻すことで、データのバックフィルシナリオをより適切にサポートできます。その結果、後続のジョブ実行ではブックマークされたジョブ実行からのデータだけが再処理されます。
同じジョブを使用してすべてのデータを再処理する場合は、ジョブのブックマークをリセットします。ジョブのブックマークの状態をリセットするには、AWS Glue コンソール、ResetJobBookmark アクション (Python: reset_job_bookmark) API オペレーション、または AWS CLI を使用します。例えば、AWS CLI を使用して以下のコマンドを入力します。
aws glue reset-job-bookmark --job-name
my-job-name
ブックマークを巻き戻したりリセットしたりしたとき、複数のターゲットが存在する可能性があり、ターゲットがジョブブックマークで追跡されないため、AWS Glue はターゲットファイルをクリーンアップしません。ジョブブックマークで追跡されるのは、ソースファイルだけです。ソースファイルの巻き戻しと再処理時に異なる出力ターゲットを作成して、出力内のデータが重複しないようにすることができます。
AWS Glue では、ジョブでジョブのブックマークを管理します。ジョブを削除すると、ジョブのブックマークは削除されます。
場合によっては、AWS Glue ジョブのブックマークを有効にしてあっても、以前の実行で処理したデータを ETL ジョブで再処理することがあります。このエラーの一般的な原因を解決する方法の詳細については、「AWS Glue のエラーのトラブルシューティング」を参照してください。
変換コンテキスト
AWS Glue PySpark の動的フレームの多くのメソッドには、transformation_ctx
というオプションのパラメータが含まれています。このパラメータは ETL 演算子インスタンスの一意の識別子です。transformation_ctx
パラメータは指定された演算子に対するジョブのブックマーク内の状態情報を識別するために使用されます。具体的には、AWS Glue では transformation_ctx
を使用してブックマーク状態に対するキーにインデックスを付けます。
ジョブのブックマークが正しく機能するように、ジョブのブックマークのパラメータを有効にし、transformation_ctx
パラメータを設定します。transformation_ctx
パラメータを渡さない場合、メソッドで使用されている動的フレームやテーブルに対してジョブのブックマークは有効になりません。例えば、ETL ジョブで 2 つの Amazon S3 ソースを読み取って結合する場合、ブックマークを有効にするメソッドに対してのみ transformation_ctx
パラメータを渡すことができます。1 つのジョブについてジョブのブックマークをリセットした場合、transformation_ctx
の使用には関係なく、このジョブに関連付けられているすべての変換がリセットされます。
DynamicFrameReader
クラスの詳細については、「DynamicFrameReader クラス」を参照してください。PySpark 拡張の詳細については、「AWS Glue PySpark 拡張機能リファレンス」を参照してください
AWS Glue の生成したスクリプトでジョブのブックマークを使用する
このセクションでは、ジョブのブックマークを使用する運用の詳細を説明します。また、ソースと宛先を選択してジョブを実行するときに AWS Glue から生成可能なスクリプトの例も提供します。
ジョブのブックマークはジョブの状態を保存します。状態の各インスタンスは、ジョブ名とバージョン番号がキーになっています。スクリプトで呼び出された job.init
では、その状態を取得し、常に最新バージョンを取得します。1 つの状態の中に複数の状態要素が存在します。要素は、スクリプト内のソース、変換、シンクインスタンスごとに固有です。これらの状態要素は、スクリプト内の対応する要素 (ソース、変換、またはシンク) にアタッチされている変換コンテキストによって識別されます。状態要素はユーザースクリプトから job.commit
が呼び出されたときにアトミックに保存されます。スクリプトは引数からジョブのブックマークのジョブ名およびコントロールオプションを取得します。
ジョブのブックマーク内の状態要素は、ソース、変換、またはシンク固有のデータです。例えば、アップストリームジョブまたはプロセスによって継続的に書き込まれている Amazon S3 の場所から増分データを読み取るとします。この場合、スクリプトではそれまでに処理されている部分を判別する必要があります。Amazon S3 ソースのジョブのブックマーク実装では情報を保存するため、再度ジョブを実行するときに、保存された情報を使用して新しいオブジェクトのみをフィルターでき、次にジョブを実行するための状態を再計算できます。新しいファイルをフィルタリングするためにタイムスタンプが使用されます。
ジョブのブックマークは、状態要素に加えて、実行番号、試行番号、バージョン番号 を持ちます。実行番号はジョブの実行を追跡し、試行番号はジョブ実行の試行回数を記録します。ジョブ実行番号は正常な実行ごとに増分される、一定間隔で増加する番号です。試行番号は各実行の試行回数を追跡し、失敗した試行後のに実行がある場合にのみ増分されます。バージョン番号は、一定間隔で増加し、ジョブのブックマークの更新を追跡します。
AWS Glue サービスデータベースでは、すべての変換のブックマーク状態がキーと値のペアとして一緒に格納されます。
{ "job_name" : ..., "run_id": ..., "run_number": .., "attempt_number": ... "states": { "transformation_ctx1" : { bookmark_state1 }, "transformation_ctx2" : { bookmark_state2 } } }
transformation_ctx
は、スクリプト内の特定のソースのブックマーク状態を検索するためのキーとして機能します。ブックマークを正しく機能させるためには、ソースと関連する transformation_ctx
の一貫性を常に保つ必要があります。ソースプロパティを変更したり、transformation_ctx
の名前を変更したりすると、前のブックマークが無効になり、タイムスタンプベースのフィルタリングで正しい結果が得られない場合があります。
ベストプラクティス
以下は、AWS Glue で生成されたスクリプトでジョブブックマークを使用するためのベストプラクティスです。
スクリプトの先頭に
job.init()
と、末尾にjob.commit()
を常に記述してください。これら 2 つの関数は、ブックマークサービスを初期化し、サービスの状態変化を更新するために使用されます。ブックマークは呼び出さないと機能しません。ブックマークを有効にした状態でデータソースプロパティを変更しないでください。例えば、Amazon S3 入力パス A を指している datasource0 があります。ジョブはブックマークを有効にして数ラウンド実行されているソースから読み出しています。
transformation_ctx
を変更せずに datasource0 の入力パスを Amazon S3 パス B に変更すると、AWS Glue ジョブは保存されている古いブックマークの状態を使用します。これによって、入力パス B のファイルは、見つからないか、スキップされます。AWS Glue は、これらのファイルが以前の実行で処理されたと仮定します。パーティション管理を改善するために、ブックマーク付きのカタログテーブルを使用してください。ブックマークは、データカタログのデータソースとオプションの両方に対して機能します。ただし、from オプションのアプローチでは、新しいパーティションを削除/追加するのは困難です。クローラーでカタログテーブルを使用すると、新しく追加された パーティションを追跡するための自動化が向上し、プッシュダウン述語を使用して特定のパーティションを柔軟に選択できるようになります。
大規模なデータセット向けのAWS GlueAmazon S3 ファイルリスター
の使用 ブックマークは、各入力パーティションの下のすべてのファイルをリストし、ファイリングを実行します。したがって、単一のパーティションの下にファイルが多すぎると、ブックマークがドライバ OOM で実行される可能性があります。AWS Glue Amazon S3 ファイルリスターを使用して、メモリ内のすべてのファイルを一度にリストしないようにします。
Amazon S3 データソース用に生成されたスクリプトの例を次に示します。ジョブのブックマークを使用するために必要なスクリプトの部分は、太字と斜体で表示されています。これらの要素の詳細については、GlueContext クラス API および DynamicFrameWriter クラス API を参照してください。
# Sample Script import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2") job.commit()
JDBC ソース用に生成されたスクリプトの例を以下に示します。ソーステーブルは、empno
列がプライマリキーである従業員テーブルです。デフォルトでは、ブックマークキーが指定されていない場合、ジョブはブックマークキーとしてシーケンシャルプライマリキーを使用しますが、empno
は必ずしもシーケンシャルではなく、値にギャップがある可能性があるため、デフォルトのブックマークキーとして適切ではありません。したがって、スクリプトは empno
を明示的にブックマークキーとして指定します。コードのその部分は、太字と斜体で表示されます。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "hr", table_name = "emp", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"}) ## @type: ApplyMapping ## @args: [mapping = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2") job.commit()
jobBookmarkKeys
および jobBookmarkKeysSortOrder
は以下の方法で指定できます。
-
create_dynamic_frame.from_catalog
—additional_options
を使用する。 -
create_dynamic_frame.from_options
—connection_options
を使用する。
ジョブブックマークに関連する接続オプションの詳細については、「JDBC connectionType の値」を参照してください。