AWS Doc SDK Examples
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
AddJobFlowSteps
で を使用する AWS SDK
次の例は、AddJobFlowSteps
を使用する方法を説明しています。
- Python
-
- SDK for Python (Boto3)
-
注記
詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリ
での設定と実行の方法を確認してください。 Spark ステップを追加します。このステップは、追加されるとすぐにクラスターによって実行されます。
def add_step(cluster_id, name, script_uri, script_args, emr_client): """ Adds a job step to the specified cluster. This example adds a Spark step, which is run by the cluster as soon as it is added. :param cluster_id: The ID of the cluster. :param name: The name of the step. :param script_uri: The URI where the Python script is stored. :param script_args: Arguments to pass to the Python script. :param emr_client: The Boto3 EMR client object. :return: The ID of the newly added step. """ try: response = emr_client.add_job_flow_steps( JobFlowId=cluster_id, Steps=[ { "Name": name, "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": [ "spark-submit", "--deploy-mode", "cluster", script_uri, *script_args, ], }, } ], ) step_id = response["StepIds"][0] logger.info("Started step with ID %s", step_id) except ClientError: logger.exception("Couldn't start step %s with URI %s.", name, script_uri) raise else: return step_id
クラスターのジョブステップとして Amazon EMR File System (EMRFS) コマンドを実行します。これは、 SSH接続を介してEMRFSコマンドを手動で実行する代わりに、クラスターでコマンドを自動化するために使用できます。
import boto3 from botocore.exceptions import ClientError def add_emrfs_step(command, bucket_url, cluster_id, emr_client): """ Add an EMRFS command as a job flow step to an existing cluster. :param command: The EMRFS command to run. :param bucket_url: The URL of a bucket that contains tracking metadata. :param cluster_id: The ID of the cluster to update. :param emr_client: The Boto3 Amazon EMR client object. :return: The ID of the added job flow step. Status can be tracked by calling the emr_client.describe_step() function. """ job_flow_step = { "Name": "Example EMRFS Command Step", "ActionOnFailure": "CONTINUE", "HadoopJarStep": { "Jar": "command-runner.jar", "Args": ["/usr/bin/emrfs", command, bucket_url], }, } try: response = emr_client.add_job_flow_steps( JobFlowId=cluster_id, Steps=[job_flow_step] ) step_id = response["StepIds"][0] print(f"Added step {step_id} to cluster {cluster_id}.") except ClientError: print(f"Couldn't add a step to cluster {cluster_id}.") raise else: return step_id def usage_demo(): emr_client = boto3.client("emr") # Assumes the first waiting cluster has EMRFS enabled and has created metadata # with the default name of 'EmrFSMetadata'. cluster = emr_client.list_clusters(ClusterStates=["WAITING"])["Clusters"][0] add_emrfs_step( "sync", "s3://elasticmapreduce/samples/cloudfront", cluster["Id"], emr_client ) if __name__ == "__main__": usage_demo()
-
API 詳細については、「 for AWS SDKPython (Boto3) APIリファレンスAddJobFlowSteps」の「」を参照してください。
-
アクション
DescribeCluster