使用 Lambda 函數在暫時性 EMR 叢集中啟動 Spark 工作 - AWS Prescriptive Guidance

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Lambda 函數在暫時性 EMR 叢集中啟動 Spark 工作

由阿爾維(AWS)創建

Envronment 生產

技術:分析

AWS 服務:亞馬遜 EMR; AWS Identity and Access Management; AWS Lambda; 亞馬遜 VPC

工作負載:開放原始碼

Summary

此模式使用 Amazon EMR RunJobFlow API 動作啟動暫時叢集,以從 Lambda 函數執行 Spark 作業。 暫時性 EMR 叢集的設計是在工作完成或發生任何錯誤時立即終止。 暫時性叢集可節省成本,因為它只在計算時間內執行,而且它可在雲端環境中提供延展性和彈性。

暫態 EMR 叢集是使用 Boto3 API 和 Python 程式設計語言在 Lambda 函式中啟動。Lambda 函數是用 Python 編寫的,提供了在需要時啟動叢集的額外靈活性。

為了示範範範例批次計算和輸出,此模式將從 Lambda 函數啟動 EMR 叢集中的 Spark 工作,並針對虛構公司的範例銷售資料執行批次計算。 Spark 任務的輸出將是 Amazon Simple Storage Service (Amazon S3) 中的逗號分隔值 (CSV) 檔案。用於執行計算的虛擬私有雲 (VPC) 和 AWS Identity and Access Management (IAM) 角色的輸入資料檔案、Spark .jar 檔案和 AWS CloudFormation 範本會以附件形式提供。

先決條件和限制

先決條件

  • 一個 AWS 帳戶

限制

  • 一次只能從代碼啟動一個 Spark 作業。 

產品版本

  • 在執行 EMR 5.0.0+

Architecture

目標技術堆疊

  • Amazon EMR 

  • AWS Lambda

  • Amazon S3

  • Apache Spark

目標架構

自動化和擴展

若要自動化 Spark EMR 批次計算,您可以使用下列任一選項。

Tools

工具

  • Amazon EMR— 亞馬遜 EMR 是業界領先的雲大數據平台,用於處理大量數據使用開源工具,如阿帕奇星火,阿帕奇 HBase,阿帕奇 Flink,阿帕奇胡迪和普雷斯托。 

  • AWS Lambda— Lambda 是一項運算服務,可讓您執行程式碼,無需佈建或管理伺服器。Lambda 只有在需要時才會執行程式碼,可自動從每天數項請求擴展成每秒數千項請求。您只要按實際使用的運算時間付費即可,未執行程式碼時不必支付任何費用。使用 Lambda,您幾乎可以為任何類型的應用程式或後端服務執行程式碼,而且無須管理。AWS Lambda 在高可用性的運算基礎設施上執行您的程式碼,並管理所有運算資源,包括伺服器和作業系統維護,容量佈建與記錄。

  • Amazon S3-Amazon S3 提供網際網路儲存體。您可以使用 Amazon S3 隨時從 Web 任何地方存放和擷取任意資料量。

Code

下面的程式碼片段可以用作啟動 EMR 集群 Spark 作業的起點。

""" Copy paste the following code in your Lambda function. Make sure to change the following key parameters for the API as per your account
-Name (Name of Spark cluster) -LogUri (S3 bucket to store EMR logs) -Ec2SubnetId (The subnet to launch the cluster into) -JobFlowRole (Service role for Amazon EMR) -ServiceRole (Service role for EC2)
The following parameters are additional parameters for the Spark job itself. Change the bucket name and prefix for the Spark job (located at the bottom).
-s3://your-bucket-name/prefix/lamba-emr/SparkProfitCalc.jar (Spark jar file) -s3://your-bucket-name/prefix/fake_sales_data.csv (Input data file in S3) -s3://your-bucket-name/prefix/outputs/report_1/ (Output location in S3) """ import json import boto3 client = boto3.client('emr') def lambda_handler(event, context):          response = client.run_job_flow(         Name= 'spark_job_cluster',         LogUri= 's3://your-bucket-name/prefix/logs',         ReleaseLabel= 'emr-6.0.0',         Instances={             'MasterInstanceType': 'm5.xlarge',             'SlaveInstanceType': 'm5.large',             'InstanceCount': 1,             'KeepJobFlowAliveWhenNoSteps': False,             'TerminationProtected': False,             'Ec2SubnetId': 'subnet-XXXXXXXXXXXXXX'         },         Applications = [ {'Name': 'Spark'} ],         Configurations = [              { 'Classification': 'spark-hive-site',               'Properties': {                    'hive.metastore.client.factory.class': 'com.amazonaws.glue.catalog.metastore.AWSGlueDataCatalogHiveClientFactory'}             }         ],         VisibleToAllUsers=True,         JobFlowRole = 'EMRLambda-EMREC2InstanceProfile-XXXXXXXXX',         ServiceRole = 'EMRLambda-EMRRole-XXXXXXXXX',         Steps=[             {                 'Name': 'flow-log-analysis',                 'ActionOnFailure': 'TERMINATE_CLUSTER',                 'HadoopJarStep': {                         'Jar': 'command-runner.jar',                         'Args': [                             'spark-submit',                             '--deploy-mode', 'cluster',                             '--executor-memory', '6G',                             '--num-executors', '1',                             '--executor-cores', '2',                             '--class', 'com.aws.emr.ProfitCalc',                             's3://your-bucket-name/prefix/lamba-emr/SparkProfitCalc.jar',                             's3://your-bucket-name/prefix/fake_sales_data.csv',                             's3://your-bucket-name/prefix/outputs/report_1/'                         ]                 }             }         ]     )

Epics

任務描述所需技能
建立 IAM 角色和 VPC。

如果您已有 Lambda 和 Amazon EMR IAM 角色和 VPC,則可以略過此步驟。若要執行程式碼,EMR 叢集和 Lambda 函數都需要 IAM 角色。EMR 叢集也需要具有公有子網路的 VPC,或具有 NAT 閘道的私有子網路。若要自動建立所有 IAM 角色和 VPC,請依原樣部署附加的 AWS CloudFormation 範本,或者您可以按照「其他資訊」部分中的指定手動建立角色和 VPC。

雲端架構師
請注意 AWS CloudFormation 範本輸出金鑰。

成功部署 CloudFormation 範本之後,請瀏覽至 AWS CloudFormation 主控台中的「輸出」標籤。請注意四個輸出鍵:「執行角色」、「ServiceRole」、「JobFlowRole」和「EC2 子標識」。當您建立 Lambda 函數時,您將會使用這些機碼的值。

雲端架構師
任務描述所需技能
在 Amazon S3 主控台上建立 S3 儲存貯體。

建立 S3 儲存貯體或使用現有的儲存貯體。

AWS 一般
上傳 Spark .jar 檔案。

將 Spark .jar 檔案上傳至 S3 儲存貯體。

AWS 一般
任務描述所需技能
建立 Lambda 函數。

在 Lambda 控制台上,創建一個具有執行角色的 Python 3.8+ Lambda 函數。執行角色原則必須允許 Lambda 啟動 EMR 叢集。(請參閱附加的 AWS CloudFormation 範本。)

雲端/大數據開發人員
複製並貼上程式碼。

將「lambda_function.py」檔案中的程式碼取代為此樣式的「工具」區段的程式碼。

雲端/大數據開發人員
變更程式碼中的參數。

依照程式碼中的註解變更參數值以符合您的 AWS 帳戶。

雲端/大數據開發人員
啟動函數以啟動叢集。

啟動函數,使用提供的 Spark,jar 檔案啟動暫時性 EMR 叢集的建立。它將運行 Spark 作業,並在作業完成時自動終止。

雲端/大數據開發人員
檢查 EMR 叢集狀態。

啟動 EMR 叢集之後,它會出現在 EMR 主控台的「叢集」標籤下。啟動叢集或執行作業時的任何錯誤都可以相應地檢查。

雲端/大數據開發人員
任務描述所需技能
上傳 Spark .jar 檔案。

從「附件」部分下載 Spark .jar 文件並將其上傳到 S3 存儲桶。

雲端/資料工程師
上傳輸入資料集。

將附加的「fake_sales_data.csv」檔案上傳至 S3 儲存貯體。

雲端/大數據開發人員
貼上 Lambda 程式碼並變更參數。

從 < 工具 > 一節中複製程式碼,並將程式碼貼到 Lambda 函式中,取代程式碼 "lambda_function.py" 檔案。變更參數值以符合您的帳戶。

雲端/大數據開發人員
啟動函式並驗證輸出。

Lambda 函數使用提供的 Spark 作業啟動叢集之後,它會在 S3 儲存貯體中生成 CSV 檔案。

雲端/大數據開發人員

References

其他資訊

若要在 Lambda 函數中啟動 EMR 叢集,則需要 VPC 和 IAM 角色。您可以使用此模式的 [附件] 區段中的 AWS CloudFormation 範本來設定 VPC 和 IAM 角色,也可以使用以下連結手動建立它們。 

執行 Lambda 和 Amazon EMR 需要下列 IAM 角色。 

Lambda 執行角色

Lambda 函數的執行角色授予該公司存取 AWS 服務和資源的許可。

Amazon EMR 的服務角色

所以此Amazon EMR 角色會在佈建資源和執行服務層級的任務 (這些任務不會在執行叢集之 Amazon EC2 執行個體的細節中執行) 時,定義 Amazon EMR 允許的動作。例如,服務角色用於在叢集啟動時佈建 EC2 執行個體。

EC2 執行個體的服務角色

所以此叢集 EC2 執行個體的服務角色(也稱為 Amazon EMR 執行個體設定檔) 是一種特殊類型的服務角色,它會在執行個體啟動時指派給 Amazon EMR 叢集中的每個 EC2 執行個體。在 Hadoop 生態系統上執行的應用程式程序會擔任此角色,以取得與其他 AWS 服務互動的許可。

VPC 和子網路建立

您可以使用 VPC 精靈從 VPC 主控台建立新的 VPC。 

Attachments

attachment.zip