コードの開発時には、ローカルなテストを実行して、ワークフローのレイアウトに誤りがないことを確認する必要があります。
ローカルテストでは AWS Glue ジョブ、クローラ、トリガーは生成されません。代わりに、レイアウトスクリプトをローカルで実行し、to_json()
および validate()
メソッドによりオブジェクトを画面表示し、エラーを発見します。これらのメソッドは、ライブラリで定義されている 3 つのクラスすべてで使用できます。
user_params
がレイアウト関数に渡す、引数 system_params
および AWS Glue を処理するには 2 つの方法があります。テストベンチコードでは、サンプル設計図パラメータ値のディクショナリを作成し、それを引数 user_params
としてレイアウト関数に渡すことができます。または、user_params
への参照を削除し、その部分をハードコーディングのための文字列に置き換えます。
コードの引数 system_params
の中で、region
および accountId
プロパティを利用している場合には、system_params
を独自の辞書に含めて渡すことができます。
設計図をテストするには
-
Python インタプリタをライブラリのあるディレクトリで起動するか、設計図ファイルならびに提供されたライブラリを、希望の統合開発環境 (IDE) にロードします。
-
コードによって提供されたライブラリがインポートされていることを確認します。
-
任意のエンティティまたは
Workflow
オブジェクトでvalidate()
またはto_json()
呼び出すためのコードを、レイアウト関数に追加します。例えば、コードがmycrawler
という名前のCrawler
オブジェクトを作成する場合には、以下のようにvalidate()
を呼び出せます。mycrawler.validate()
mycrawler
は以下のように表示できます。print(mycrawler.to_json())
オブジェクトで
to_json
を呼び出す場合、別途validate()
を呼び出す必要はありません。validate()
はto_json()
により呼び出されます。これにより、ワークフローオブジェクトでのこれらのメソッドの呼び出しが、最も効率的に行えます。スクリプトで、ワークフローオブジェクトに
my_workflow
と名前を付けているのであれば、次のようにワークフローオブジェクトを検証して画面表示します。print(my_workflow.to_json())
to_json()
とvalidate()
の詳細については、「クラスメソッド」を参照してください。また、このセクションの後半の例に示すように、
pprint
をインポートし、ワークフローオブジェクトを書式を設定しながら表示することも可能です。 -
コードを実行し、エラーを修正した上で、最後に
validate()
またはto_json()
の呼び出しを削除します。
次の例では、サンプルの設計図パラメータのディクショナリを構築し、それをレイアウト関数 generate_compaction_workflow
の引数 user_params
に渡す方法を示しています。また、生成されたワークフローオブジェクトを、書式を指定しながら印刷するための方法も示しています。
from pprint import pprint
from awsglue.blueprint.workflow import *
from awsglue.blueprint.job import *
from awsglue.blueprint.crawler import *
USER_PARAMS = {"WorkflowName": "compaction_workflow",
"ScriptLocation": "s3://awsexamplebucket1/scripts/threaded-compaction.py",
"PassRole": "arn:aws:iam::111122223333:role/GlueRole-ETL",
"DatabaseName": "cloudtrial",
"TableName": "ct_cloudtrail",
"CoalesceFactor": 4,
"MaxThreadWorkers": 200}
def generate_compaction_workflow(user_params: dict, system_params: dict) -> Workflow:
compaction_job = Job(Name=f"{user_params['WorkflowName']}_etl_job",
Command={"Name": "glueetl",
"ScriptLocation": user_params['ScriptLocation'],
"PythonVersion": "3"},
Role="arn:aws:iam::111122223333:role/AWSGlueServiceRoleDefault",
DefaultArguments={"DatabaseName": user_params['DatabaseName'],
"TableName": user_params['TableName'],
"CoalesceFactor": user_params['CoalesceFactor'],
"max_thread_workers": user_params['MaxThreadWorkers']})
catalog_target = {"CatalogTargets": [{"DatabaseName": user_params['DatabaseName'], "Tables": [user_params['TableName']]}]}
compacted_files_crawler = Crawler(Name=f"{user_params['WorkflowName']}_post_crawl",
Targets = catalog_target,
Role=user_params['PassRole'],
DependsOn={compaction_job: "SUCCEEDED"},
WaitForDependencies="AND",
SchemaChangePolicy={"DeleteBehavior": "LOG"})
compaction_workflow = Workflow(Name=user_params['WorkflowName'],
Entities=Entities(Jobs=[compaction_job],
Crawlers=[compacted_files_crawler]))
return compaction_workflow
generated = generate_compaction_workflow(user_params=USER_PARAMS, system_params={})
gen_dict = generated.to_json()
pprint(gen_dict)