AWS Glue でスクリプトを編集する - AWS Glue

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

AWS Glue でスクリプトを編集する

スクリプトには、ソースからデータを抽出して、変換し、ターゲットにロードするコードが含まれています。AWS Glue​ はジョブを開始するときにスクリプトを実行します。

Python または Scala で AWS Glue ETL スクリプトを記述できます。Python スクリプトは、抽出、変換、およびロード (ETL) ジョブのための PySpark Python ダイアレクトの拡張機能である言語を使用します。スクリプトには ETL 変換を処理する拡張構造が含まれます。自動でジョブのソースコードロジックを生成するときに、スクリプトが作成されます。このスクリプトを編集するか、または、独自のスクリプトを指定して ETL 作業を処理することができます。

AWS Glue コンソールを使用したスクリプトの定義と編集の詳細については、「AWS Glue コンソールでのスクリプトの操作」を参照してください。

スクリプトの定義

ソースとターゲットがあると、AWS Glue はデータを変換するスクリプトを生成できます。この提案されたスクリプトは、ソースとターゲットを埋める最初のバージョンで、PySpark での変換を提案しています。スクリプトを確認して、ビジネスニーズに合わせて変更できます。AWS Glue のスクリプトエディタを使用して、ソースとターゲットを指定する引数、および実行に必要なその他の引数を追加します。スクリプトはジョブによって実行され、ジョブはスケジュールやイベントに基づくトリガーによって開始されます。トリガーについての詳細は、「トリガーを使用したジョブとクローラの開始」を参照してください。

AWS Glue コンソールでは、スクリプトはコードとして表されます。スクリプトを、スクリプトに埋め込まれた注釈 (##) を使用する図として表示することもできます。これらの注釈は、AWS Glue コンソールで図を生成するのに使用されるパラメータ、変換タイプ、引数、入力、その他のスクリプトの特性を説明します。

スクリプトの図は、以下を示します。

  • スクリプトへのソース入力

  • 変換

  • スクリプトにより書き込まれたターゲット出力

スクリプトには以下の注釈を含めることができます。

注釈 使用
@params スクリプトが必要とする ETL ジョブからのパラメータ。
@type 変換タイプ、データソース、またはデータシンクなどの、図のノードのタイプ。
@args 入力データへの参照を除く、ノードに渡される引数。
@return スクリプトから返される変数。
@inputs ノードへの入力データ。

スクリプト内のコード構文については、「プログラムAWSPython で ETL スクリプトをGlue する」を参照してください。

AWS Glue によって生成されるスクリプトの例を以下に示します。このスクリプトは、Amazon Simple Storage Service (Amazon S3) の場所から別の場所にシンプルなデータセットをコピーし、形式を CSV から JSON に変更するジョブを対象としています。初期化コード後、スクリプトにはデータソース、マッピング、およびターゲット (データシンク) を指定するコマンドが含まれます。注釈にも注意してください。

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "sample-data", table_name = "taxi_trips", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "sample-data", table_name = "taxi_trips", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("vendorid", "long", "vendorid", "long"), ("tpep_pickup_datetime", "string", "tpep_pickup_datetime", "string"), ("tpep_dropoff_datetime", "string", "tpep_dropoff_datetime", "string"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("ratecodeid", "long", "ratecodeid", "long"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("pulocationid", "long", "pulocationid", "long"), ("dolocationid", "long", "dolocationid", "long"), ("payment_type", "long", "payment_type", "long"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("vendorid", "long", "vendorid", "long"), ("tpep_pickup_datetime", "string", "tpep_pickup_datetime", "string"), ("tpep_dropoff_datetime", "string", "tpep_dropoff_datetime", "string"), ("passenger_count", "long", "passenger_count", "long"), ("trip_distance", "double", "trip_distance", "double"), ("ratecodeid", "long", "ratecodeid", "long"), ("store_and_fwd_flag", "string", "store_and_fwd_flag", "string"), ("pulocationid", "long", "pulocationid", "long"), ("dolocationid", "long", "dolocationid", "long"), ("payment_type", "long", "payment_type", "long"), ("fare_amount", "double", "fare_amount", "double"), ("extra", "double", "extra", "double"), ("mta_tax", "double", "mta_tax", "double"), ("tip_amount", "double", "tip_amount", "double"), ("tolls_amount", "double", "tolls_amount", "double"), ("improvement_surcharge", "double", "improvement_surcharge", "double"), ("total_amount", "double", "total_amount", "double")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://example-data-destination/taxi-data"}, format = "json", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://example-data-destination/taxi-data"}, format = "json", transformation_ctx = "datasink2") job.commit()