ジョブのブックマークを使用した処理済みデータの追跡 - AWS Glue

ジョブのブックマークを使用した処理済みデータの追跡

AWS Glue ではジョブの実行による状態情報を保持することで、ETL ジョブの以前の実行中にすでに処理されたデータを追跡します。この継続状態の情報はジョブのブックマークと呼ばれています。ジョブのブックマークは、AWS Glue で状態情報を保持して、古いデータを再処理しないために役立ちます。ジョブのブックマークを使用すると、スケジュールされた間隔で再実行する際に新しいデータを処理できます。ジョブのブックマークは、ソース、変換、ターゲットなど、さまざまなジョブの要素で構成されています。例えば、ETL ジョブが Amazon S3 ファイルで新しいパーティションを読み込むとします。AWS Glue は、そのジョブにより正常に処理されたのはどのパーティションなのかを追跡し、処理の重複およびジョブのターゲットデータストアにデータが重複するのを防ぎます。

ジョブのブックマークは、JDBC データソース、Relationalize 変換、および一部の Amazon Simple Storage Service (Amazon S3) ソースに実装されています。次の表に、AWS Glue がジョブのブックマークに対してサポートする Amazon S3 ソース形式を示します。

AWS Glue バージョン Amazon S3 ソース形式
バージョン 0.9 JSON、CSV、Apache Avro、XML
バージョン 1.0 以降 JSON、CSV、Apache Avro、XML、Parquet、ORC

AWS Glue バージョンの詳細に関しては、「Spark ジョブのジョブプロパティの定義」を参照してください。

JDBC ソースでは、次のルールが適用されます。

  • AWS Glue では、テーブルごとに 1 つ以上の列をブックマークキーとして使用して、新しいデータおよび処理済みのデータを決定します。ブックマークキーは、結合して 1 つの複合キーを形成します。

  • ブックマークキーとして使用する列を指定できます。ブックマークキーを指定しない場合、AWS Glue はデフォルトでプライマリキーをブックマークキーとして使用します。ただし、プライマリキーが連続して (ギャップなく) 増減することが条件です。

  • ユーザー定義のブックマークキーを使用する場合、そのキーは厳密に一定間隔で増減する必要があります。ギャップは許容されます。

  • AWS Glue は、ジョブブックマークキーとして大文字と小文字を区別する列の使用をサポートしていません。

AWS Glue でジョブのブックマークを使用する

ジョブのブックマークオプションは、ジョブが開始したときにパラメータとして渡されます。AWS Glue コンソールでジョブのブックマークを設定するためのオプションを次の表に示します。

ジョブのブックマーク 説明
Enable (有効化) ジョブの実行後に状態を更新させて以前に処理されたデータを追跡します。ジョブのブックマークをサポートしているソースのあるジョブの場合、ジョブは処理されたデータを追跡し、ジョブが実行されると、最後のチェックポイント以降の新しいデータを処理します。
Disable (無効) ジョブのブックマークは使用されず、ジョブは常にデータセット全体を処理します。以前のジョブからの出力の管理は、ユーザーが行います。これがデフォルト値です。
一時停止

最後のブックマークの状態は更新せずに、最後に正常に実行された後の増分データ、または次のサブオプションで識別される範囲内のデータを処理します。以前のジョブからの出力の管理は、ユーザーが行います。次の 2 つのサブオプションがあります。

  • job-bookmark-from <from-value> は、指定された実行 ID を含む最後に成功した実行までに処理されたすべての入力を表す実行 ID です。対応する入力は無視されます。

  • job-bookmark-to <to-value> は、指定された実行 ID を含む最後に成功した実行までに処理されたすべての入力を表す実行 ID です。<from-value> によって識別される入力を除く対応する入力は、ジョブによって処理されます。この入力より後の入力も処理対象から除外されます。

このオプションが設定されている場合、ジョブのブックマークの状態は更新されません。

サブオプションはオプションですが、使用する場合、両方のサブオプションを提供する必要があります。

コマンドラインでジョブに渡されるパラメータ、および特にジョブブックマークの詳細については、「AWS Glue で使用される特殊なパラメータ」を参照してください。

Amazon S3 入力ソースの場合、AWS Glue ジョブのブックマークではオブジェクトの最終更新日時を確認して、どのオブジェクトを再処理する必要があるのかを確認します。入力ソースデータが最後のジョブ実行以降に変更されている場合、ジョブを再度実行すると、ファイルが再度処理されます。

AWS Glue Spark ETL ジョブのジョブブックマークを以前のジョブ実行に巻き戻すことができます。ジョブのブックマークを以前のジョブ実行に巻き戻すことで、データのバックフィルシナリオをより適切にサポートできます。その結果、後続のジョブ実行ではブックマークされたジョブ実行からのデータだけが再処理されます。

同じジョブを使用してすべてのデータを再処理する場合は、ジョブのブックマークをリセットします。ジョブのブックマークの状態をリセットするには、AWS Glue コンソール、ResetJobBookmark アクション (Python: reset_job_bookmark) API オペレーション、または AWS CLI を使用します。たとえば、AWS CLI を使用して以下のコマンドを入力します。

aws glue reset-job-bookmark --job-name my-job-name

ブックマークを巻き戻したりリセットしたりしたとき、複数のターゲットが存在する可能性があり、ターゲットがジョブブックマークで追跡されないため、AWS Glue はターゲットファイルをクリーンアップしません。ジョブブックマークで追跡されるのは、ソースファイルだけです。ソースファイルの巻き戻しと再処理時に異なる出力ターゲットを作成して、出力内のデータが重複しないようにすることができます。

AWS Glue では、ジョブでジョブのブックマークを管理します。ジョブを削除すると、ジョブのブックマークは削除されます。

場合によっては、AWS Glue ジョブのブックマークを有効にしてあっても、以前の実行で処理したデータを ETL ジョブで再処理することがあります。このエラーの一般的な原因を解決する方法の詳細については、「AWS Glue のエラーのトラブルシューティング」を参照してください。

変換コンテキスト

AWS Glue PySpark の動的フレームの多くのメソッドには、transformation_ctx というオプションのパラメータが含まれています。このパラメータは ETL 演算子インスタンスの一意の識別子です。transformation_ctx パラメータは指定された演算子に対するジョブのブックマーク内の状態情報を識別するために使用されます。具体的には、AWS Glue では transformation_ctx を使用してブックマーク状態に対するキーにインデックスを付けます。

ジョブのブックマークが正しく機能するように、ジョブのブックマークのパラメータを有効にし、transformation_ctx パラメータを設定します。transformation_ctx パラメータを渡さない場合、メソッドで使用されている動的フレームやテーブルに対してジョブのブックマークは有効になりません。例えば、ETL ジョブで 2 つの Amazon S3 ソースを読み取って結合する場合、ブックマークを有効にするメソッドに対してのみ transformation_ctx パラメータを渡すことができます。1 つのジョブについてジョブのブックマークをリセットした場合、transformation_ctx の使用には関係なく、このジョブに関連付けられているすべての変換がリセットされます。

DynamicFrameReader クラスの詳細については、「DynamicFrameReader クラス」を参照してください。PySpark 拡張の詳細については、「AWS Glue PySpark 拡張機能リファレンス」を参照してください

AWS Glue の生成したスクリプトでジョブのブックマークを使用する

このセクションでは、ジョブのブックマークを使用する運用の詳細を説明します。また、ソースと宛先を選択してジョブを実行するときに AWS Glue から生成可能なスクリプトの例も提供します。

ジョブのブックマークはジョブの状態を保存します。状態の各インスタンスは、ジョブ名とバージョン番号がキーになっています。スクリプトで呼び出された job.init では、その状態を取得し、常に最新バージョンを取得します。1 つの状態の中に複数の状態要素が存在します。要素は、スクリプト内のソース、変換、シンクインスタンスごとに固有です。これらの状態要素は、スクリプト内の対応する要素 (ソース、変換、またはシンク) にアタッチされている変換コンテキストによって識別されます。状態要素はユーザースクリプトから job.commit が呼び出されたときにアトミックに保存されます。スクリプトは引数からジョブのブックマークのジョブ名およびコントロールオプションを取得します。

ジョブのブックマーク内の状態要素は、ソース、変換、またはシンク固有のデータです。例えば、アップストリームジョブまたはプロセスによって継続的に書き込まれている Amazon S3 の場所から増分データを読み取るとします。この場合、スクリプトではそれまでに処理されている部分を判別する必要があります。Amazon S3 ソースのジョブのブックマーク実装では情報を保存するため、再度ジョブを実行するときに、保存された情報を使用して新しいオブジェクトのみをフィルターでき、次にジョブを実行するための状態を再計算できます。新しいファイルをフィルタリングするためにタイムスタンプが使用されます。

ジョブのブックマークは、状態要素に加えて、実行番号試行番号バージョン番号 を持ちます。実行番号はジョブの実行を追跡し、試行番号はジョブ実行の試行回数を記録します。ジョブ実行番号は正常な実行ごとに増分される、一定間隔で増加する番号です。試行番号は各実行の試行回数を追跡し、失敗した試行後のに実行がある場合にのみ増分されます。バージョン番号は、一定間隔で増加し、ジョブのブックマークの更新を追跡します。

Amazon S3 データソース用に生成されたスクリプトの例を次に示します。ジョブのブックマークを使用するために必要なスクリプトの部分は、太字と斜体で表示されています。これらの要素の詳細については、GlueContext クラス API および DynamicFrameWriter クラス API を参照してください。

# Sample Script import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2") job.commit()

JDBC ソース用に生成されたスクリプトの例を以下に示します。ソーステーブルは、empno 列がプライマリキーである従業員テーブルです。デフォルトでは、ブックマークキーが指定されていない場合、ジョブはブックマークキーとしてシーケンシャルプライマリキーを使用しますが、empno は必ずしもシーケンシャルではなく、値にギャップがある可能性があるため、デフォルトのブックマークキーとして適切ではありません。したがって、スクリプトは empno を明示的にブックマークキーとして指定します。コードのその部分は、太字と斜体で表示されます。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "hr", table_name = "emp", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"}) ## @type: ApplyMapping ## @args: [mapping = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2") job.commit()

ジョブブックマークに関連する接続オプションの詳細については、「JDBC connectionType の値」を参照してください。