ジョブのブックマークを使用した処理済みデータの追跡 - AWS Glue

ジョブのブックマークを使用した処理済みデータの追跡

AWS Glue ではジョブの実行による状態情報を保持することで、ETL ジョブの以前の実行中にすでに処理されたデータを追跡します。この継続状態の情報はジョブのブックマークと呼ばれています。ジョブのブックマークは、AWS Glue で状態情報を保持して、古いデータを再処理しないために役立ちます。ジョブのブックマークを使用すると、スケジュールされた間隔で再実行する際に新しいデータを処理できます。ジョブのブックマークは、ソース、変換、ターゲットなど、さまざまなジョブの要素で構成されています。たとえば、ETL ジョブが Amazon S3 ファイルで新しいパーティションを読み込むとします。AWS Glue は、そのジョブにより正常に処理されたのはどのパーティションなのかを追跡し、処理の重複およびジョブのターゲットデータストアにデータが重複するのを防ぎます。

ジョブのブックマークは、JDBC データソース、Relationalize 変換、および一部の Amazon Simple Storage Service (Amazon S3) ソースに実装されています。次の表に、AWS Glue がジョブのブックマークに対してサポートする Amazon S3 ソース形式を示します。

AWS Glue バージョン Amazon S3 ソース形式
バージョン 0.9 JSON、CSV、Apache Avro、XML
バージョン 1.0 以降 JSON、CSV、Apache Avro、XML、Parquet、ORC

バージョンの詳細に関しては、「ジョブプロパティの定義」を参照してください。

JDBC ソースでは、次のルールが適用されます。

  • AWS Glue では、テーブルごとに 1 つ以上の列をブックマークキーとして使用して、新しいデータおよび処理済みのデータを決定します。ブックマークキーは、結合して 1 つの複合キーを形成します。

  • ブックマークキーとして使用する列を指定できます。ブックマークキーを指定しない場合、AWS Glue はデフォルトでプライマリキーをブックマークキーとして使用します。ただし、プライマリキーが連続して (ギャップなく) 増減することが条件です。

  • ユーザー定義のブックマークキーを使用する場合、そのキーは厳密に一定間隔で増減する必要があります。ギャップは許容されます。

AWS Glue でジョブのブックマークを使用する

ジョブのブックマークオプションは、ジョブが開始したときにパラメータとして渡されます。AWS Glue コンソールでジョブのブックマークを設定するためのオプションを次の表に示します。

ジョブのブックマーク 説明
Enable (有効) ジョブの実行後に状態を更新させて以前に処理されたデータを追跡します。ジョブのブックマークをサポートしているソースのあるジョブの場合、ジョブは処理されたデータを追跡し、ジョブが実行されると、最後のチェックポイント以降の新しいデータを処理します。
Disable (無効) ジョブのブックマークは使用されず、ジョブは常にデータセット全体を処理します。以前のジョブからの出力の管理は、ユーザーが行います。これがデフォルト値です。
一時停止

最後のブックマークの状態は更新せずに、最後に正常に実行された後の増分データ、または次のサブオプションで識別される範囲内のデータを処理します。以前のジョブからの出力の管理は、ユーザーが行います。次の 2 つのサブオプションがあります。

  • job-bookmark-from <from-value> は、指定された実行 ID を含む最後に成功した実行までに処理されたすべての入力を表す実行 ID です。対応する入力は無視されます。

  • job-bookmark-to <to-value> は、指定された実行 ID を含む最後に成功した実行までに処理されたすべての入力を表す実行 ID です。<from-value> によって識別される入力を除く対応する入力は、ジョブによって処理されます。この入力より後の入力も処理対象から除外されます。

このオプションが設定されている場合、ジョブのブックマークの状態は更新されません。

サブオプションはオプションですが、使用する場合、両方のサブオプションを提供する必要があります。

コマンドラインでジョブに渡されるパラメータ、および特にジョブブックマークの詳細については、「AWS Glue で使用される特別なパラメータ」を参照してください。

Amazon S3 入力ソースの場合、AWS Glue ジョブのブックマークではオブジェクトの最終更新日時を確認して、どのオブジェクトを再処理する必要があるのかを確認します。入力ソースデータが最後のジョブ実行以降に変更されている場合、ジョブを再度実行すると、ファイルが再度処理されます。

Glue Spark ETL ジョブのジョブブックマークを以前のジョブ実行に巻き戻すことができます。ジョブのブックマークを以前のジョブ実行に巻き戻すことで、データのバックフィルシナリオをより適切にサポートできます。その結果、後続のジョブ実行ではブックマークされたジョブ実行からのデータだけが再処理されます。

同じジョブを使用してすべてのデータを再処理する場合は、ジョブのブックマークをリセットします。ジョブのブックマークの状態をリセットするには、AWS Glue コンソール、ResetJobBookmark アクション (Python: reset_job_bookmark) API オペレーション、または AWS CLI を使用します。たとえば、AWS CLI を使用して以下のコマンドを入力します。

aws glue reset-job-bookmark --job-name my-job-name

AWS Glue では、ジョブでジョブのブックマークを管理します。ジョブを削除すると、ジョブのブックマークは削除されます。

場合によっては、AWS Glue ジョブのブックマークを有効にしてあっても、以前の実行で処理したデータを ETL ジョブで再処理することがあります。このエラーの一般的な原因を解決する方法の詳細については、「AWS Glue のエラーのトラブルシューティング」を参照してください。

変換コンテキスト

AWS Glue PySpark の動的フレームの多くのメソッドには、transformation_ctx というオプションのパラメータが含まれています。このパラメータは ETL 演算子インスタンスの一意の識別子です。transformation_ctx パラメータは指定された演算子に対するジョブのブックマーク内の状態情報を識別するために使用されます。具体的には、AWS Glue では transformation_ctx を使用してブックマーク状態に対するキーにインデックスを付けます。

ジョブのブックマークが正しく機能するように、ジョブのブックマークのパラメータを有効にし、transformation_ctx パラメータを設定します。transformation_ctx パラメータを渡さない場合、メソッドで使用されている動的フレームやテーブルに対してジョブのブックマークは有効になりません。たとえば、ETL ジョブで 2 つの Amazon S3 ソースを読み取って結合する場合、ブックマークを有効にするメソッドに対してのみ transformation_ctx パラメータを渡すことができます。1 つのジョブについてジョブのブックマークをリセットした場合、transformation_ctx の使用には関係なく、このジョブに関連付けられているすべての変換がリセットされます。

DynamicFrameReader クラスの詳細については、「DynamicFrameReader クラス」を参照してください。PySpark 拡張の詳細については、「AWS Glue PySpark 拡張機能リファレンス」を参照してください

AWS Glue の生成したスクリプトでジョブのブックマークを使用する

このセクションでは、ジョブのブックマークを使用する運用の詳細を説明します。また、ソースと宛先を選択してジョブを実行するときに AWS Glue から生成可能なスクリプトの例も提供します。

ジョブのブックマークはジョブの状態を保存します。状態の各インスタンスは、ジョブ名とバージョン番号がキーになっています。スクリプトで呼び出された job.init では、その状態を取得し、常に最新バージョンを取得します。1 つの状態の中に複数の状態要素が存在します。要素は、スクリプト内のソース、変換、シンクインスタンスごとに固有です。これらの状態要素は、スクリプト内の対応する要素 (ソース、変換、またはシンク) にアタッチされている変換コンテキストによって識別されます。状態要素はユーザースクリプトから job.commit が呼び出されたときにアトミックに保存されます。スクリプトは引数からジョブのブックマークのジョブ名およびコントロールオプションを取得します。

ジョブのブックマーク内の状態要素は、ソース、変換、またはシンク固有のデータです。たとえば、アップストリームジョブまたはプロセスによって継続的に書き込まれている Amazon S3 の場所から増分データを読み込むとします。この場合、スクリプトではそれまでに処理されている部分を判別する必要があります。Amazon S3 ソースのジョブのブックマーク実装では情報を保存するため、再度ジョブを実行するときに、保存された情報を使用して新しいオブジェクトのみをフィルターでき、次にジョブを実行するための状態を再計算できます。新しいファイルをフィルタリングするためにタイムスタンプが使用されます。

ジョブのブックマークは、状態要素に加えて、実行番号試行番号バージョン番号 を持ちます。実行番号はジョブの実行を追跡し、試行番号はジョブ実行の試行回数を記録します。ジョブ実行番号は正常な実行ごとに増分される、一定間隔で増加する番号です。試行番号は各実行の試行回数を追跡し、失敗した試行後のに実行がある場合にのみ増分されます。バージョン番号は、一定間隔で増加し、ジョブのブックマークの更新を追跡します。

Amazon S3 データソース用に生成されたスクリプトの例を以下に示します。ジョブのブックマークを使用するために必要なスクリプトの部分は、太字と斜体で表示されています。これらの要素の詳細については、GlueContext クラス API および DynamicFrameWriter クラス API を参照してください。

# Sample Script import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2") job.commit()

JDBC ソース用に生成されたスクリプトの例を以下に示します。ソーステーブルは、empno 列がプライマリキーである従業員テーブルです。デフォルトでは、ブックマークキーが指定されていない場合、ジョブはシーケンシャルプライマリキーをブックマークキーとして使用しますが、empno は必ずしもシーケンシャルではないため (値にギャップがある可能性があります)、デフォルトのブックマークキーの対象とはなりません。したがって、スクリプトは empno を明示的にブックマークキーとして指定します。コードのその部分は、太字と斜体で表示されます。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "hr", table_name = "emp", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarksKeysSortOrder":"asc"}) ## @type: ApplyMapping ## @args: [mapping = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2") job.commit()

ジョブブックマークに関連する接続オプションの詳細については、「JDBC connectionType の値」を参照してください。

変更のタイムスタンプを使用したファイルの追跡

Amazon S3 入力ソースの場合、AWS Glue ジョブのブックマークではファイルの最終更新日時を確認して、どのオブジェクトを再処理する必要があるのかを確認します。

次の の例を考えます。この図で、X 軸は左から右方向の時間軸で、左端のポイントが T0 です。Y 軸は時間 T で確認されたファイルのリストです。リストを表す要素は変更時間に基づいてグラフに配置されます。


        変更時間に基づく、時間 T で確認されたファイルのリストです。

この例では、変更のタイムスタンプ 1 (T1) でジョブが開始された場合、ジョブは、変更時間が T0 より大きく、T1 以下のファイルを探します。当てはまるファイルは、F2、F3、F4、および F5 です。ジョブのブックマークでは、タイムスタンプ T0 と T1 をそれぞれ低および高のタイムスタンプとして保存します。

ジョブを T2 に再実行すると、ジョブは変更時間が T1 より大きく T2 以下であるファイルをフィルターします。当てはまるファイルは、F7、F8、F9、および F10 です。したがってファイル F3'、F4'、および F5' を見落とすことになります。変更時間が T1 以下のファイル F3'、F4'、および F5' が T1 よりも後に出現する理由は、Amazon S3 リストの整合性のためです。

Amazon S3 の結果整合性に対処するために、AWS Glue では、ジョブのブックマークにファイルのリスト (パスハッシュ) を含んでいます。AWS Glue では、現在の時刻に先行する有限周期 (dt) までに限り Amazon S3 のファイルリストに整合性がないと想定します。つまり、T1 で実行されたリストに整合性がないときは、変更時間が T1 - dt と T1 の間にあるファイルのファイルリストです。一方、変更時間が T1 - d1 以下のファイルのリストは T1 以降の時点で整合性を持ちます。

AWS Glue でファイルを保存する期間 (かつファイルの整合性があると思われる期間) は、AWS Glue 接続オプションの MaxBand オプションを使用して指定します。デフォルト値は 900 秒 (15 分) です。このプロパティの詳細については、「AWS Glue での ETL の接続タイプとオプション」を参照してください。

タイムスタンプ 2 (T2) でジョブを再実行すると、以下の範囲のファイルがリスト表示されます。

  • T1 - dt (この値を含みません) ~ T1 (この値を含みます)。このリストには、F4、F5、F4'、および F5' が含まれます。このリストは、整合性のある範囲です。ただし、この範囲は T1 でのリスト表示では整合性がなく、F3、F4、および F5 のファイルのリストになります。T2 で処理されるファイルを取得するためには、ファイル F3、F4、および F5 が削除されることになります。

  • T1 (この値を含みません) ~ T2 - dt (この値を含みます)。このリストには、F7 および F8 が含まれます。このリストは、整合性のある範囲です。

  • T2 - dt (この値を含みません) ~ T2 (この値を含みます)。このリストには、F9 および F10 が含まれます。このリストは、整合性のない範囲です。

結果として得られるファイルのリストは F3'、F4'、F5'、F7、F8、F9、F10 です。

整合性のないリストにある新しいファイルは F9 および F10 で、次回実行用にフィルター内に保存されます。

Amazon S3 の結果整合性の詳細については、Amazon Simple Storage Service 開発者ガイドの「Amazon S3 の概要」を参照してください。

ジョブ実行が失敗する

ジョブが失敗すると、ジョブ実行バージョンが増分されます。たとえば、タイムスタンプ 1 (T1) で実行したジョブが失敗し、T2 で再実行されると、高位のタイムスタンプが T2 に進められます。次に、後の時点 T3 でジョブを実行すると、高位のタイムスタンプは T3 に進められます。

job.commit() (T1 時点) の前にジョブの実行が失敗した場合、ファイルは後続の実行で処理され、AWS Glue ではこの実行で T0 ~ T2 までのファイルを処理します。