教程:Amazon EMR 入门 - Amazon EMR

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

教程:Amazon EMR 入门

Overview

借助 Amazon EMR,您可以设置集群,在几分钟内处理和分析大数据框架的数据。本教程介绍如何使用 Spark 启动示例集群,以及如何运行将存储在 Amazon S3 存储桶中的简单 PySpark 脚本。它涵盖了三个主要工作流程类别中的基本 Amazon EMR 任务:规划和配置、管理和清理。您还可以根据自己的工作负载调整此过程。


				Amazon EMR 的工作流程图,其中概述了计划和配置、管理和清理这三个主要工作流程类别。

Prerequisites

在学习本教程时,您可以找到指向更详细主题的链接,以及后续步骤部分。如果您有疑问或遇到困扰,请联系亚马逊 EMR 团队,通过开发论坛

Cost

  • 您创建的示例集群将在实际环境中运行。只要您完成清理任务,集群将产生最低费用,并且只会在本教程的持续时间内运行。Amazon EMR 定价的费用按秒计算,且因地区而异。有关更多信息,请参阅 。Amazon EMR 定价

  • 为本教程,您在 Amazon S3 中存储的小文件也可能产生最低费用。如果您未超出您的 Amazon S3 的使用限制,您的部分或全部费用可能得到豁免。AWS免费套餐。有关更多信息,请参阅 。Amazon S3 定价AWS免费套餐

第 1 步:规划和配置 Amazon EMR 集群

在此步骤中,您计划并启动安装了 Apache Spark 的简单 Amazon EMR 集群。设置过程包括创建 Amazon S3 存储桶以存储示例 PySpark 脚本、输入数据集和集群输出。

为群集输入和输出准备存储

创建 Amazon S3 存储桶以存储示例 PySpark 脚本、输入数据和输出数据。在相同的AWS您计划启动 Amazon EMR 集群的区域。例如,美国西部(俄勒冈)us-west-2。您用于 Amazon EMR 的存储桶和文件夹具有以下限制:

  • 名称只能由小写字母、数字、句点 (.) 和连字符 (-) 组成。

  • 名称不能以数字结尾。

  • 存储桶名称必须唯一跨所有AWS帐户。

  • 输出文件夹必须为空。

要为本教程创建存储桶,请参阅如何创建 S3 存储桶?中的Amazon Simple Storage Service 控制台用户指南

为亚马逊 EMR 开发和准备应用程序

在此步骤中,您会将示例 PySpark 脚本上传到 Amazon S3。这是为亚马逊 EMR 准备申请的最常用方法。EMR 允许您在向集群提交工作时指定脚本的 Amazon S3 位置。您还可以将示例输入数据上传到 Amazon S3,以便 PySpark 脚本进行处理。

我们提供了以下 PySpark 脚本供您使用。该脚本处理食品设施检验数据,并输出一个文件,列出具有最 “红色” 类型违规情况的前十个场所到您的 S3 桶。

为 EMR 准备示例 PySpark 脚本

  1. 将下面的示例代码复制到选定编辑器中的新文件中。

  2. 将该文件保存为 health_violations.py

  3. 上传health_violations.py添加到您为本教程指定的存储桶中。有关如何将数据元上传到 Amazon S3 的信息,请参阅将对象上传到存储桶中的Amazon Simple Storage Service 入门指南

import argparse from pyspark.sql import SparkSession def calculate_red_violations(data_source, output_uri): """ Processes sample food establishment inspection data and queries the data to find the top 10 establishments with the most Red violations from 2006 to 2020. :param data_source: The URI where the food establishment data CSV is saved, typically an Amazon S3 bucket, such as 's3://DOC-EXAMPLE-BUCKET/food-establishment-data.csv'. :param output_uri: The URI where the output is written, typically an Amazon S3 bucket, such as 's3://DOC-EXAMPLE-BUCKET/restaurant_violation_results'. """ with SparkSession.builder.appName("Calculate Red Health Violations").getOrCreate() as spark: # Load the restaurant violation CSV data if data_source is not None: restaurants_df = spark.read.option("header", "true").csv(data_source) # Create an in-memory DataFrame to query restaurants_df.createOrReplaceTempView("restaurant_violations") # Create a DataFrame of the top 10 restaurants with the most Red violations top_red_violation_restaurants = spark.sql("SELECT name, count(*) AS total_red_violations " + "FROM restaurant_violations " + "WHERE violation_type = 'RED' " + "GROUP BY name " + "ORDER BY total_red_violations DESC LIMIT 10 ") # Write the results to the specified output URI top_red_violation_restaurants.write.option("header", "true").mode("overwrite").csv(output_uri) if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument( '--data_source', help="The URI where the CSV restaurant data is saved, typically an S3 bucket.") parser.add_argument( '--output_uri', help="The URI where output is saved, typically an S3 bucket.") args = parser.parse_args() calculate_red_violations(args.data_source, args.output_uri)

输入参数

作为步骤运行 PySpark 脚本时,必须包含以下参数的值。

  • --data_source-食品建立数据 CSV 文件的 Amazon S3 URI。您将在下面准备此文件。

  • --output_uri-将保存输出结果的 Amazon S3 存储桶的 URI。

输入数据是 2006 年至 2020 年华盛顿州金县 Health 部检查结果的公开食品机构检查数据集的修改版本。有关更多信息,请参阅 。金县开放数据:食品设施检验数据。下面是数据集中的示例行。

name, inspection_result, inspection_closed_business, violation_type, violation_points 100 LB CLAM, Unsatisfactory, FALSE, BLUE, 5 100 PERCENT NUTRICION, Unsatisfactory, FALSE, BLUE, 5 7-ELEVEN #2361-39423A, Complete, FALSE, , 0

准备 EMR 的示例输入数据

  1. 下载 zip 文件,food_establishment_data.zip

  2. 解压缩内容并将其作为 food_establishment_data.csv 保存在本地。

  3. 将 CSV 文件上传到您为本教程创建的 S3 存储桶。有关分步指导,请参阅如何将文件和文件夹上传至 S3 存储桶?中的Amazon Simple Storage Service 控制台用户指南

有关设置 EMR 数据的更多信息,请参阅准备输入数据

启动 Amazon EMR 集群

现在您已完成准备工作,您可以使用最新Amazon EMR 发布版

注意

如果您创建了AWS帐户时,亚马逊 EMR 在默认 Amazon Virtual Private Cloud (VPC),当未指定任何区域时。

Console

使用启动安装了 Spark 的集群快速选项

  1. 登录到AWS Management Console并打开亚马逊 EMR 控制台https://console.aws.amazon.com/elasticmapreduce/

  2. 选择创建集群以打开快速选项向导

  3. 在存储库的创建集群-快速选项页面上,请注意版本实例类型实例的数量, 和Permissions (权限)。这些字段会自动填充为常规用途群集选择的值。有关的更多信息快速选项配置设置,请参阅“Quick Options (快速选项)”摘要

  4. 输入集群名称以帮助您标识集群。例如,我的第一个 EMR 集群

  5. 离开日志系统启用,但替换S3 folder (S3 文件夹)值,其后是您创建的 Amazon S3 存储桶,然后是/logs。例如:s3://DOC-EXAMPLE-BUCKET/logs。这将在您的存储桶中创建一个名为 “log” 的新文件夹,其中 EMR 将复制集群的日志文件。

  6. INDER应用程序中,选择Spark选项在群集上安装 Spark。快速选项允许您从最常见的应用程序组合中进行选择。

    注意

    在启动集群之前,请务必选择要在 Amazon EMR 集群上运行的应用程序。在启动集群之后,您无法从集群中添加或删除应用程序。

  7. INDER安全和访问中,选择EC2 键前缀您指定或创建的为 SSH 创建 Amazon EC2 key pair

  8. 选择创建集群启动集群并打开集群状态页面。

  9. 在群集状态页面上,找到状态旁边的群集名称。状态应从正在启动正在运行正在等待集群创建过程中。您可能需要选择右侧的刷新图标或刷新您的浏览器才能接收更新。

当状态进展到正在等待,您的集群已启动、正在运行并准备接受工作。

CLI

使用 AWS CLI 启动安装了 Spark 的集群

  1. 使用以下命令创建 Spark 集群。输入集群的名称--name选项,并将 EC2 key pair 的名称指定为--ec2-attributes选项。

    aws emr create-cluster \ --name "My First EMR Cluster" \ --release-label emr-5.33.0 \ --applications Name=Spark \ --ec2-attributes KeyName=myEMRKeyPairName \ --instance-type m5.xlarge \ --instance-count 3 \ --use-default-roles

    请注意其他必需值--instance-type--instance-count, 和--use-default-roles。这些值已被选为一般用途群集。有关的信息create-cluster这里使用的,请参阅AWS CLI参考

    注意

    为了便于读取,包括 Linux 行延续字符 (\)。它们可以被删除或在 Linux 命令中使用。对于 Windows,请将其删除或替换为脱字号 (^)。

    将会看到包含ClusterIdClusterArn的新集群。记下您的ClusterId,您将使用它来检查集群状态,并在稍后提交工作。以下是create-clusterJSON 格式的输出。

    { "ClusterId": "myClusterId", "ClusterArn": "myClusterArn" }
  2. 使用以下命令查看集群状态。

    aws emr describe-cluster --cluster-id myClusterId

    将会看到带有Status的新集群。以下是describe-clusterJSON 格式的输出。

    { "Cluster": { "Id": "myClusterId", "Name": "My First EMR Cluster", "Status": { "State": "STARTING", "StateChangeReason": { "Message": "Configuring cluster software" }, ... }, ... } {

    状态State应该从STARTINGRUNNINGWAITING集群创建过程中。

当群集状态进展到WAITING,您的集群已启动、正在运行并准备接受工作。

有关读取集群摘要的更多信息,请参阅查看集群状态和详细信息。有关集群状态的信息,请参阅了解集群生命周期

第 2 步:管理您的亚马逊 EMR 集群

由于您的集群已启动并正在运行,您可以连接并对其进行管理。您还可以将工作提交到正在运行的集群以处理和分析数据。

向亚马逊 EMR 提交工作

由于您的集群已启动并正在运行,您可以将health_violations.py作为step。步骤是由一个或多个作业组成的集群工作单元。例如,您可以提交一个步骤来计算值,或传输和处理数据。

您可以在创建集群时或群集已运行之后提交多个步骤来完成群集上的一组任务。有关更多信息,请参阅将工作提交到群集

Console

使用控制台作为步骤提交 Spark 应用程序

  1. 从打开 Amazon EMR 控制台https://console.aws.amazon.com/elasticmapreduce/

  2. Cluster List (集群列表) 中,选择您的集群的名称。确保集群处于 Waiting (正在等待) 状态。

  3. 选择 Steps (步骤),然后选择 Add step (添加步骤)

  4. 根据以下准则配置步骤:

    • 对于 Step type (步骤类型),选择 Spark application (Spark 应用程序)。将会看到以下内容中的其他字段部署模式Spark-submit options, 和应用程序位置此时会显示。

    • 适用于名称,请保留默认值或键入新名称。如果您在集群中有很多步骤,命名每个步骤可帮助您跟踪这些步骤。

    • 适用于“部署” 模式,请保留默认值群集。有关 Spark 部署模式的更多信息,请参阅集群模式概述在 Apache Spark 文档中。

    • 将保留为Spark-submit options字段为空。有关 的更多信息spark-submit选项,请参阅。使用火花提交启动应用程序

    • 适用于应用程序位置中,输入health_violations.py脚 Amazon S3。例如,s3: //DOC-EXAMPLE-BUCKET/health_violations.py

    • Arguments (参数)字段中,输入以下参数和值:

      --data_source s3://DOC-EXAMPLE-BUCKET/food_establishment_data.csv --output_uri s3://DOC-EXAMPLE-BUCKET/myOutputFolder

      Replaces3: //DOC-EXAMPLE-BUCKET/food_establishment_data.csv与您准备的输入数据的 S3 URI为亚马逊 EMR 开发和准备应用程序

      Replace文档示例存储桶替换为本教程创建的存储桶的名称,我的输出文件夹替换为集群输出文件夹的名称。

    • 适用于出现故障时的操作,接受默认选项Continue以便在步骤失败时,集群将继续运行。

  5. 选择Add以提交步骤。步骤应出现在控制台中,其状态为Pending

  6. 步骤的状态应从Pending正在运行已完成因为它运行。要更新控制台中的状态,请选择筛选条件。运行该脚本大约需要一分钟时间。

您将知道,当状态更改为已完成

CLI

要使用AWS CLI

  1. 确保您已安装ClusterId中启动的集群启动 Amazon EMR 集群。您也可使用以下命令检索集群 ID。

    aws emr list-clusters --cluster-states WAITING
  2. “提交”health_violations.py作为add-steps命令与ClusterId

    • 您可以指定步骤的名称,方法是将“我的火花应用程序”。在Args数组,将s3: //DOC-EXAMPLE-BUCKET/health_violations.py的位置,health_violations.py应用程序.

    • Replaces3: //DOC-EXAMPLE-BUCKET/food_establishment_data.csv的 S3 位置,food_establishment_data.csv数据集。

    • Replaces3: //文档示例-Budter (存储桶) /myputputputputputfile,其中包含指定存储桶的 S3 路径以及集群输出文件夹的名称。

    • ActionOnFailure=CONTINUE表示集群将在步骤失败时继续运行。

    aws emr add-steps \ --cluster-id myClusterId \ --steps Type=Spark,Name="My Spark Application",ActionOnFailure=CONTINUE,Args=[s3://DOC-EXAMPLE-BUCKET/health_violations.py,--data_source,s3://DOC-EXAMPLE-BUCKET/food_establishment_data.csv,--output_uri,s3://DOC-EXAMPLE-BUCKET/MyOutputFolder]

    有关使用 CLI 提交步骤的更多信息,请参阅AWS CLI命令参考

    提交步骤后,您应该看到带有StepIds。由于您提交了一个步骤,列表中应该只有一个 ID。复制步骤 ID,您将使用该 ID 检查步骤的状态。

    以下是 JSON 格式的控制台输出示例,您应在提交步骤后看到该示例。

    { "StepIds": [ "s-1XXXXXXXXXXA" ] }
  3. 使用步骤 ID 查询步骤的状态和describe-step命令。Replace我的集群以及您的群集 ID。

    aws emr describe-step --cluster-id myClusterId --step-id s-1XXXXXXXXXXA

    您应该看到包含有关步骤的信息的输出,以及Status部分。以下是示例describe-stepJSON 格式的输出。

    { "Step": { "Id": "s-1XXXXXXXXXXA", "Name": "My Spark Application", "Config": { "Jar": "command-runner.jar", "Properties": {}, "Args": [ "spark-submit", "s3://DOC-EXAMPLE-BUCKET/health_violations.py", "--data_source", "s3://DOC-EXAMPLE-BUCKET/food_establishment_data.csv", "--output_uri", "s3://DOC-EXAMPLE-BUCKET/myOutputFolder" ] }, "ActionOnFailure": "CONTINUE", "Status": { "State": "COMPLETED", ... } } }

    这些区域有:State的步骤更改为PENDINGRUNNINGCOMPLETED,因为步骤运行。该步骤大约需要一分钟才能运行,因此您可能需要检查几次状态。

您将知道该步骤是成功的,当State对 的更改COMPLETED

有关步骤生命周期的更多信息,请参阅运行处理数据的步骤

查看结果

步骤成功运行之后,您可以在提交步骤时指定的 Amazon S3 输出文件夹中查看其输出结果。

查看结果的步骤health_violations.py

  1. 通过以下网址打开 Amazon S3 控制台:https://console.aws.amazon.com/s3/

  2. 选择Bucket name,然后是您在提交步骤时指定的输出文件夹。例如,文档示例存储桶然后我的输出文件夹

  3. 验证输出文件夹中是否有以下项目:

    • 一个名为_SUCCESS,表明您的步骤是否成功。

    • 以前缀开头的 CSV 文件part-。这是您的结果的对象。

  4. 选择带有结果的对象,然后选择下载将其保存到您的本地文件系统。

  5. 在选定编辑器中打开结果。输出文件列出了最红色违规行为的十大食品场所。

    以下是health_violations.py结果。

    name, total_red_violations SUBWAY, 322 T-MOBILE PARK, 315 WHOLE FOODS MARKET, 299 PCC COMMUNITY MARKETS, 251 TACO TIME, 240 MCDONALD'S, 177 THAI GINGER, 153 SAFEWAY INC #1508, 143 TAQUERIA EL RINCONSITO, 134 HIMITSU TERIYAKI, 128

有关 Amazon EMR 集群输出的更多信息,请参阅配置输出位置

(可选)设置集群连接

此步骤不是必需的,但您可以选择使用 Secure Shell (SSH) 连接到群集节点,以执行诸如发出命令、交互方式运行应用程序和读取日志文件等任务。

配置安全组规则

在连接到集群之前,您必须设置端口 22 入站规则以允许进行 SSH 连接。

安全组充当虚拟防火墙以控制至您的集群的入站和出站流量。当您使用默认安全组创建集群时,Amazon EMR 会创建以下组:

ElasticMapReduce-master

与主实例关联的默认 Amazon EMR 管理安全组。

ElasticMapReduce-slave

与核心节点和任务节点关联的默认安全组。

允许对弹性映射减少主安全组的受信任源进行 SSH 访问

您必须先登录AWS作为 root 用户或允许用于管理集群所在 VPC 的安全组的 IAM 委托人。有关更多信息,请参阅 。更改 IAM 用户的权限示例策略,它允许管理IAM 用户指南

  1. 从打开 Amazon EMR 控制台https://console.aws.amazon.com/elasticmapreduce/

  2. 选择 Clusters

  3. 选择集群的 Name (名称)

  4. Security and access (安全与访问) 下,选择 Security groups for Master (主节点的安全组) 链接。

  5. 从列表中选择 ElasticMapReduce-master

  6. 选择入站编辑

  7. 通过以下设置检查允许公共访问的入站规则。如果存在,请选择Delete以将其删除。

    • 类型

      SSH

    • 端口

      22

    • 定制 0.0.0.0.0.0.0.0.0.0/0

    警告

    2020 年 12 月之前,公有子网中主实例的默认 EMR 托管安全组创建了预配置规则以允许端口 22 上来自所有来源的入站流量。此规则的创建是为了简化到主节点的初始 SSH 连接。我们强烈建议您删除此入站规则,并仅限制来自可信来源的流量。

  8. 滚动到规则列表底部,然后选择添加规则

  9. 对于Type (类型),选择 SSH

    这会自动输入TCP对于 来说为协议22对于 来说为端口范围

  10. 对于源,选择 My IP (我的 IP)

    这会自动将您的客户端计算机的 IP 地址添加为源地址。或者,您可以添加一系列Custom (自定义)可信客户端 IP 地址,然后选择 Add rule (添加规则) 来创建针对其他客户端的其他规则。许多网络环境动态分配 IP 地址,因此您可能需要定期编辑安全组规则以更新可信客户端的 IP 地址。

  11. 选择 Save

  12. (可选)选择ElasticMapReduce-slave并重复上述步骤以允许从可信客户端对核心和任务节点执行 SSH 客户端访问。

Connect 到集群

配置 SSH 规则后,请转到使用 SSH Connect 到主节点,然后按说明执行操作:

  • 检索要连接到的节点的公有 DNS 名称。

  • 使用 SSH Connect 到您的集群。

有关如何对群集节点进行身份验证的更多信息,请参阅对 Amazon EMR 集群节点进行身份验证

第 3 步:清除 Amazon EMR 资源

既然您已向集群提交工作并查看了 PySpark 应用程序的结果,您可以关闭集群并删除指定的 Amazon S3 存储桶,以避免额外费用。

关闭您的集群

关闭集群会停止其所有关联的 Amazon EMR 费用和 Amazon EC2 实例。

Amazon EMR 会在您终止集群后免费保留有关您的集群的元数据两个月。这使得它很容易克隆群集,或重新访问其配置以供参考。元数据会发生包含群集可能已写入 S3 的数据,或者群集运行时存储在 HDFS 中的数据。

注意

Amazon EMR 控制台不允许您在关闭集群后从列表视图中删除集群。当 Amazon EMR 清除其元数据时,终止的集群将从控制台中消失。

Console

使用控制台关闭集群

  1. 从打开 Amazon EMR 控制台https://console.aws.amazon.com/elasticmapreduce/

  2. 选择集群,然后选择要关闭的集群。例如,我的第一个 EMR 集群

  3. 选择终止以打开终止集群

  4. 在打开的提示符下,选择终止关闭集群。根据集群配置,完全终止和释放已分配的 EC2 资源可能需要 5 到 10 分钟。有关关闭 Amazon EMR 集群的更多信息,请参阅终止群集

    注意

    通常在创建集群时使用终止保护以防止意外关闭。如果您密切遵循本教程,则终止保护应关闭。如果终止保护处于启用状态,则在终止集群之前,您会看到更改设置的提示。选择变更,然后Off

CLI

使用关闭集群AWS CLI

  1. 使用以下命令启动集群终止流程,将替换为我的集群将替换为示例集群的 ID。

    aws emr terminate-clusters --cluster-ids myClusterId

    您不应看到任何输出。

  2. 要检查群集终止过程是否已开始,请使用以下命令检查群集状态。

    aws emr describe-cluster --cluster-id myClusterId

    以下是 JSON 格式的输出示例。集群集群Status应该从TERMINATINGTERMINATED。根据集群配置,完全终止和释放已分配的 EC2 资源可能需要 5 到 10 分钟。有关关闭 Amazon EMR 集群的更多信息,请参阅终止群集..

    { "Cluster": { "Id": "j-xxxxxxxxxxxxx", "Name": "My Cluster Name", "Status": { "State": "TERMINATED", "StateChangeReason": { "Code": "USER_REQUEST", "Message": "Terminated by user request" }, ... }, ... }, ... }

删除 S3 资源

删除您之前创建的存储桶,以删除本教程中使用的所有 Amazon S3 对象。此存储桶应包含您的输入数据集、集群输出、PySpark 脚本和日志文件。如果您将 PySpark 脚本或输出保存在其他位置,则可能需要执行额外的步骤来删除已存储的文件。

注意

您的集群必须完全关闭,然后才能删除存储桶。否则,当您尝试清空存储桶时,您可能会遇到问题。

按照中的说明进行操作如何删除 S3 存储桶?中的Amazon Simple Storage Service 入门指南来清空您的存储桶并将其从 S3 中删除。

后续步骤

现在,您已从头到尾启动了您的第一个 Amazon EMR 集群,并完成了基本的 EMR 任务,如准备和提交大数据应用程序、查看结果和关闭集群。

以下是一些建议主题,可了解有关定制 Amazon EMR 工作流程的更多信息。

了解亚马逊 EMR 的大数据应用程序

发现并比较您可以安装在Amazon EMR 版本指南。发行指南还包含有关每个 EMR 版本的详细信息,以及有关如何配置和使用框架(如 Spark 和 Hadoop 在 Amazon EMR 上)的提示。

规划群集硬件、网络和安全

在本教程中,您将创建一个简单的 EMR 集群,而无需配置实例类型、联网和安全性等高级选项。有关规划和启动满足您的速度、容量和安全性要求,请参阅计划和配置集群Amazon EMR 中的安全性

管理集群

深入了解如何在管理集群,其中介绍了如何连接到集群、调试步骤以及跟踪群集活动和运行状况。您还可以了解有关调整群集资源以响应工作负载需求的详细信息EMR 托管扩展

使用不同的界面

除了亚马逊 EMR 控制台之外,您还可以使用AWS Command Line Interface、Web 服务 API 或众多受支持的AWS开发工具包。有关更多信息,请参阅管理界面

您可以使用多种方法与 Amazon EMR 集群上安装的应用程序进行交互。某些应用程序(如 Apache Hadoop)会发布您可以在集群实例上查看的 Web 界面。有关更多信息,请参阅查看亚马逊 EMR 集群上托管的 Web 界面。通过运行 Apache Spark 的 Amazon EMR 集群,您可以在 Amazon EMR 控制台中使用 EMR 笔记本来运行查询和代码。有关更多信息,请参阅EMR 笔记本

浏览 EMR 技术博客

有关 EMR 功能的示例演练和深入的技术讨论,请参阅AWS大数据博客