AWS Glue
開発者ガイド

ジョブのブックマークを使用した処理済みデータの追跡

AWS Glue ではジョブの実行による状態情報を保持することで、ETL ジョブの以前の実行中にすでに処理されたデータを追跡します。この継続状態の情報はジョブのブックマークと呼ばれています。ジョブのブックマークは、AWS Glue で状態情報を保持して、古いデータを再処理しないために役立ちます。ジョブのブックマークを使用すると、スケジュールされた間隔で再実行する際に新しいデータを処理できます。 ジョブのブックマークは、ソース、変換、ターゲットなど、さまざまなジョブの要素で構成されています。たとえば、ETL ジョブが Amazon S3 ファイルで新しいパーティションを読み込むとします。AWS Glue は、そのジョブにより正常に処理されたのはどのパーティションなのかを追跡し、処理の重複およびジョブのターゲットデータストアにデータが重複するのを防ぎます。

ジョブのブックマークは一部の Amazon Simple Storage Service (Amazon S3) ソースおよび Relationalize 変換に実装されています。次の表に、AWS Glue がジョブのブックマークに対してサポートする Amazon S3 ソース形式を示します。

AWS Glue バージョン Amazon S3 ソース形式
バージョン 0.9 JSON、CSV、Apache Avro、XML
バージョン 1.0 以降 JSON、CSV、Apache Avro、XML、Parquet、ORC

バージョンの詳細に関しては、「ジョブプロパティの定義」を参照してください。

ジョブのブックマークは、リレーショナルデータベース (JDBC 接続) 入力ソース向けには限定されたユースケース用に実装されています。この入力ソースの場合、ジョブのブックマークは、テーブルのプライマリキーが順番に並んでいる場合にのみサポートされています。また、ジョブのブックマークでは新しい行を検索しますが更新された行は検索しません。これは、ブックマークで探すプライマリキーがすでに存在しているからです。

AWS Glue でジョブのブックマークを使用する

AWS Glue コンソールで、ジョブのブックマークオプションは、ジョブが開始したときにパラメータとして渡されます。AWS Glue でジョブのブックマークを設定するためのオプションを次の表に示します。

ジョブのブックマーク 説明
Enable (有効) ジョブの実行後に状態を更新させて以前に処理されたデータを追跡します。ジョブのブックマークをサポートしているソースのあるジョブの場合、ジョブは処理されたデータを追跡し、ジョブが実行されると、最後のチェックポイント以降の新しいデータを処理します。
Disable (無効) ジョブのブックマークは使用されず、ジョブは常にデータセット全体を処理します。以前のジョブからの出力の管理は、ユーザーが行います。これがデフォルト値です。
一時停止 最後の実行からの増分データを処理します。ジョブは前回実行から状態情報を読み取りますが、更新はしません。これを使用することで、後続のすべての実行が最後のブックマーク以降のデータを処理することを確実化できます。以前のジョブからの出力の管理は、ユーザーが行います。

ジョブに渡されるパラメータ、特にジョブのブックマークの詳細については、「AWS Glue で使用される特別なパラメータ」を参照してください。

Amazon S3 入力ソースの場合、AWS Glue ジョブのブックマークではオブジェクトの最終更新日時を確認して、どのオブジェクトを再処理する必要があるのかを確認します。入力ソースデータが最後のジョブ実行以降に変更されている場合、ジョブを再度実行すると、ファイルが再度処理されます。

同じジョブを使用してすべてのデータを再処理する場合は、ジョブのブックマークをリセットします。ジョブのブックマークの状態をリセットするには、AWS Glue コンソール、ResetJobBookmark アクション (Python: reset_job_bookmark) API オペレーション、または AWS CLI を使用します。たとえば、AWS CLI を使用して以下のコマンドを入力します。

aws glue reset-job-bookmark --job-name my-job-name

AWS Glue では、ジョブでジョブのブックマークを管理します。ジョブを削除すると、ジョブのブックマークは削除されます。

場合によっては、AWS Glue ジョブのブックマークを有効にしてあっても、以前の実行で処理したデータを ETL ジョブで再処理することがあります。このエラーの一般的な原因を解決する方法の詳細については、「AWS Glue のエラーのトラブルシューティング」を参照してください。

変換コンテキスト

AWS Glue PySpark の動的フレームの多くのメソッドには、transformation_ctx というオプションのパラメータが含まれています。このパラメータは ETL 演算子インスタンスの一意の識別子です。transformation_ctx パラメータは指定された演算子に対するジョブのブックマーク内の状態情報を識別するために使用されます。具体的には、AWS Glue では transformation_ctx を使用してブックマーク状態に対するキーにインデックスを付けます。

ジョブのブックマークが正しく機能するように、ジョブのブックマークのパラメータを有効にし、transformation_ctx パラメータを設定します。transformation_ctx パラメータを渡さない場合、メソッドで使用されている動的フレームやテーブルに対してジョブのブックマークは有効になりません。たとえば、ETL ジョブで 2 つの Amazon S3 ソースを読み取って結合する場合、ブックマークを有効にするメソッドに対してのみ transformation_ctx パラメータを渡すことができます。1 つのジョブについてジョブのブックマークをリセットした場合、transformation_ctx の使用には関係なく、このジョブに関連付けられているすべての変換がリセットされます。

DynamicFrameReader クラスの詳細については、「DynamicFrameReader クラス」を参照してください。PySpark 拡張の詳細については、「AWS Glue PySpark 拡張機能リファレンス」を参照してください

AWS Glue の生成したスクリプトでジョブのブックマークを使用する

このセクションでは、ジョブのブックマークを使用する運用の詳細を説明します。また、ソースと宛先を選択してジョブを実行するときに AWS Glue から生成可能なスクリプトの例も提供します。

ジョブのブックマークはジョブの状態を保存します。状態の各インスタンスは、ジョブ名とバージョン番号がキーになっています。スクリプトで呼び出された job.init では、その状態を取得し、常に最新バージョンを取得します。1 つの状態の中に複数の状態要素が存在します。要素は、スクリプト内のソース、変換、シンクインスタンスごとに固有です。これらの状態要素は、スクリプト内の対応する要素 (ソース、変換、またはシンク) にアタッチされている変換コンテキストによって識別されます。状態要素はユーザースクリプトから job.commit が呼び出されたときにアトミックに保存されます。スクリプトは引数からジョブのブックマークのジョブ名およびコントロールオプションを取得します。

ジョブのブックマーク内の状態要素は、ソース、変換、またはシンク固有のデータです。たとえば、アップストリームジョブまたはプロセスによって継続的に書き込まれている Amazon S3 の場所から増分データを読み込むとします。この場合、スクリプトではそれまでに処理されている部分を判別する必要があります。Amazon S3 ソースのジョブのブックマーク実装では情報を保存するため、再度ジョブを実行するときに、保存された情報を使用して新しいオブジェクトのみをフィルターでき、次にジョブを実行するための状態を再計算できます。新しいファイルをフィルタリングするためにタイムスタンプが使用されます。

ジョブのブックマークは、状態要素に加えて、実行番号試行番号バージョン番号 を持ちます。実行番号はジョブの実行を追跡し、試行番号はジョブ実行の試行回数を記録します。ジョブ実行番号は正常な実行ごとに増分される、一定間隔で増加する番号です。試行番号は各実行の試行回数を追跡し、失敗した試行後のに実行がある場合にのみ増分されます。バージョン番号は、一定間隔で増加し、ジョブのブックマークの更新を追跡します。

以下は生成されたスクリプトの例です。スクリプトとそれに関連する引数を使用して、ジョブのブックマークに必要とされるさまざまな要素を示しています。これらの要素の詳細については、GlueContext クラス API および DynamicFrameWriter クラス API を参照してください。

# Sample Script import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session *job = Job(glueContext)* *job.init(args['JOB_NAME'], args)* ## @type: DataSource ## @args: [database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "database", table_name = "relatedqueries_csv", *transformation_ctx = "datasource0")* ## @type: ApplyMapping ## @args: [mapping = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], *transformation_ctx = "applymapping1"*) ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", *transformation_ctx = "datasink2"*) *job.commit()* Job Arguments : *--job-bookmark-option, job-bookmark-enable* *--JOB_NAME, name-1-s3-2-s3-encrypted*

変更のタイムスタンプを使用したファイルの追跡

Amazon S3 入力ソースの場合、AWS Glue ジョブのブックマークではファイルの最終更新日時を確認して、どのオブジェクトを再処理する必要があるのかを確認します。

次の の例を考えます。この図で、X 軸は左から右方向の時間軸で、左端のポイントが T0 です。Y 軸は時間 T で確認されたファイルのリストです。リストを表す要素は変更時間に基づいてグラフに配置されます。


        変更時間に基づく、時間 T で確認されたファイルのリストです。

この例では、変更のタイムスタンプ 1 (T1) でジョブが開始された場合、ジョブは、変更時間が T0 より大きく、T1 以下のファイルを探します。当てはまるファイルは、F2、F3、F4、および F5 です。ジョブのブックマークでは、タイムスタンプ T0 と T1 をそれぞれ低および高のタイムスタンプとして保存します。

ジョブを T2 に再実行すると、ジョブは変更時間が T1 より大きく T2 以下であるファイルをフィルターします。当てはまるファイルは、F7、F8、F9、および F10 です。したがってファイル F3'、F4'、および F5' を見落とすことになります。変更時間が T1 以下のファイル F3'、F4'、および F5' が T1 よりも後に出現する理由は、Amazon S3 リストの整合性のためです。

Amazon S3 の結果整合性に対処するために、AWS Glue では、ジョブのブックマークにファイルのリスト (パスハッシュ) を含んでいます。AWS Glue では、現在の時刻に先行する有限周期 (dt) までに限り Amazon S3 のファイルリストに整合性がないと想定します。つまり、T1 で実行されたリストに整合性がないときは、変更時間が T1 - dt と T1 の間にあるファイルのファイルリストです。一方、変更時間が T1 - d1 以下のファイルのリストは T1 以降の時点で整合性を持ちます。

AWS Glue でファイルを保存する期間 (かつファイルの整合性があると思われる期間) は、AWS Glue 接続オプションの MaxBand オプションを使用して指定します。デフォルト値は 900 秒 (15 分) です。このプロパティの詳細については、「AWS Glue での ETL の接続タイプとオプション」を参照してください。

タイムスタンプ 2 (T2) でジョブを再実行すると、以下の範囲のファイルがリスト表示されます。

  • T1 - dt (この値を含みません) ~ T1 (この値を含みます)。このリストには、F4、F5、F4'、および F5' が含まれます。このリストは、整合性のある範囲です。ただし、この範囲は T1 でのリスト表示では整合性がなく、F3、F4、および F5 のファイルのリストになります。T2 で処理されるファイルを取得するためには、ファイル F3、F4、および F5 が削除されることになります。

  • T1 (この値を含みません) ~ T2 - dt (この値を含みます)。このリストには、F7 および F8 が含まれます。このリストは、整合性のある範囲です。

  • T2 - dt (この値を含みません) ~ T2 (この値を含みます)。このリストには、F9 および F10 が含まれます。このリストは、整合性のない範囲です。

結果として得られるファイルのリストは F3'、F4'、F5'、F7、F8、F9、F10 です。

整合性のないリストにある新しいファイルは F9 および F10 で、次回実行用にフィルター内に保存されます。

Amazon S3 の結果整合性の詳細については、Amazon Simple Storage Service 開発者ガイドの「Amazon S3 の概要」を参照してください。

ジョブ実行が失敗する

ジョブが失敗すると、ジョブ実行バージョンが増分されます。たとえば、タイムスタンプ 1 (T1) で実行したジョブが失敗し、T2 で再実行されると、高位のタイムスタンプが T2 に進められます。次に、後の時点 T3 でジョブを実行すると、高位のタイムスタンプは Amazon S3 に進められます。

job.commit() (T1 時点) の前にジョブの実行が失敗した場合、ファイルは後続の実行で処理され、AWS Glue ではこの実行で T0 ~ T2 までのファイルを処理します。