

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# 通过 Spark Connect 与亚马逊 EMR Serverless 进行交互式会话
<a name="spark-connect"></a>

在 Amazon EMR 版本`emr-7.13.0`及更高版本中，您可以使用与 Apache Spark Connect 的 EMR 无服务器会话从 VS Code 等自我管理的 PySpark 客户端（例如 VS Code）和 Jupyter 笔记本连接到 Amazon EMR 无服务器应用程序。 PyCharm APIs Spark Connect 使用客户端-服务器架构，将您的应用程序代码与 Spark 驱动程序进程分离。当 Spark 操作在 EMR Serverless 计算上运行时，你可以在本地 IDE 中开发和调试 PySpark 代码。Spark Connect 提供以下好处：
+ 从任何 PySpark 客户端（包括 VS Code 和 Jupyter 笔记本电脑）连接到 EMR Serverless。 PyCharm
+ 在生产规模的数据上远程 DataFrames运行时，在 IDE 中设置断点并逐步执行 PySpark 代码。

Spark Connect 会话是您的本地 PySpark 客户端与在 Amazon EMR Serverless 上运行的 Spark 驱动程序之间的托管连接。当您启动会话时，EMR Serverless 会代表您配置 Spark 驱动程序和执行器。您的本地客户端向驱动程序发送 DataFrame和 SQL 操作，驱动程序会远程运行它们。会话将一直持续到您终止会话或达到空闲超时为止，因此您无需重新启动 Spark 即可交互式运行多个查询。每个会话都有自己的终端节点 URL 和用于连接的身份验证令牌。

## 所需的权限
<a name="spark-connect-permissions"></a>

除了访问亚马逊 EMR Serverless 所需的权限外，还要向你的 IAM 角色添加以下权限，以访问 Spark Connect 终端节点和管理 Spark Connect 会话：

`emr-serverless:StartSession`  
授予在您指定为的应用程序上创建 Spark Connect 会话的权限`Resource`。

`emr-serverless:GetSessionEndpoint`  
授予检索 Spark Connect 端点网址和会话身份验证令牌的权限。

`emr-serverless:GetSession`  
授予获取会话状态的权限。

`emr-serverless:ListSessions`  
授予在应用程序上列出会话的权限。

`emr-serverless:TerminateSession`  
授予终止会话的权限。

`iam:PassRole`  
授予在创建 Spark Connect 会话时访问 IAM 执行角色的权限。Amazon EMR Serverless 使用此角色来运行您的工作负载。

`emr-serverless:GetResourceDashboard`  
授予生成 Spark 用户界面网址的权限并提供对会话日志的访问权限。

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "EMRServerlessApplicationLevelAccess",
            "Effect": "Allow",
            "Action": [
                "emr-serverless:StartSession",
                "emr-serverless:ListSessions"
            ],
            "Resource": [
                "arn:aws:emr-serverless:{{region}}:{{account-id}}:/applications/{{application-id}}"
            ]
        },
        {
            "Sid": "EMRServerlessSessionLevelAccess",
            "Effect": "Allow",
            "Action": [
                "emr-serverless:GetSession",
                "emr-serverless:GetSessionEndpoint",
                "emr-serverless:TerminateSession",
                "emr-serverless:GetResourceDashboard"
            ],
            "Resource": [
                "arn:aws:emr-serverless:{{region}}:{{account-id}}:/applications/{{application-id}}/sessions/*"
            ]
        },
        {
            "Sid": "EMRServerlessRuntimeRoleAccess",
            "Effect": "Allow",
            "Action": [
                "iam:PassRole"
            ],
            "Resource": [
                "arn:aws:iam::{{account-id}}:role/{{EMRServerlessExecutionRole}}"
            ],
            "Condition": {
                "StringLike": {
                    "iam:PassedToService": "emr-serverless.amazonaws.com"
                }
            }
        }
    ]
}
```

## 使用交互式会话
<a name="spark-connect-working"></a>

要创建支持 Spark Connect 的应用程序并连接到该应用程序，请按照以下步骤操作。

**启动 Spark Connect 会话**

1. 使用 Spark Connect 会话创建应用程序。

   ```
   aws emr-serverless create-application \
     --type "SPARK" \
     --name "spark-connect-app" \
     --release-label emr-7.13.0 \
     --interactive-configuration '{"sessionEnabled": true}'
   ```

1. 在 Amazon EMR Serverless 创建您的应用程序后，如果您尚未启用自动启动功能以接受 Spark Connect 会话，请启动该应用程序。

   ```
   aws emr-serverless start-application \
     --application-id {{APPLICATION_ID}}
   ```

1. 使用以下命令检查应用程序的状态。状态变为后`STARTED`，开始会话。

   ```
   aws emr-serverless get-application \
     --application-id {{APPLICATION_ID}}
   ```

1. 使用授予数据访问权限的 IAM 执行角色启动会话。

   ```
   aws emr-serverless start-session \
     --application-id {{APPLICATION_ID}} \
     --execution-role-arn arn:aws:iam::{{account-id}}:role/{{EMRServerlessExecutionRole}}
   ```

1. 使用 `get-session` API 监控会话状态，然后等待会话进入`STARTED`或`IDLE`状态。

   ```
   aws emr-serverless get-session \
     --application-id {{APPLICATION_ID}} \
     --session-id {{SESSION_ID}}
   ```

1. 检索 Spark Connect 端点和身份验证令牌。返回的终端节点 URL `GetSessionEndpoint` 不包含端口号。在构造`sc://`连接 URL 时，必须附加`:443`，`sc://`{{hostname}}`:443/;use_ssl=true;x-aws-proxy-auth=`{{token}}例如。没有它， PySpark 客户端默认为端口 15002，在 EMR Serverless 上无法访问该端口。

   ```
   aws emr-serverless get-session-endpoint \
     --application-id {{APPLICATION_ID}} \
     --session-id {{SESSION_ID}}
   ```

   响应包括终端节点 URL 和身份验证令牌：

   ```
   {
     "endpoint": "{{ENDPOINT_URL}}",
     "authToken": "{{AUTH_TOKEN}}",
     "authTokenExpiresAt": "{{AUTH_TOKEN_EXPIRY_TIME}}"
   }
   ```

1. 终端节点准备就绪后，从 PySpark 客户端进行连接。在你的 EMR Serverless 应用程序上安装与 Spark 版本匹配的 PySpark 客户端，并安装适用于 Python AWS 的软件开发工具包。

   ```
   # Match the PySpark version to your EMR Serverless release version (3.5.6 for emr-7.13.0)
   pip install pyspark[connect]==3.5.6
   pip install boto3
   ```

以下是启动会话并将请求直接发送到会话端点的 Python 脚本示例：

```
import boto3
import time
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

client = boto3.client('emr-serverless', region_name='{{REGION}}')

APPLICATION_ID = '{{APPLICATION_ID}}'
EXECUTION_ROLE = 'arn:aws:iam::{{account-id}}:role/{{EMRServerlessExecutionRole}}'

# Start the session
response = client.start_session(
    applicationId=APPLICATION_ID,
    executionRoleArn=EXECUTION_ROLE
)
session_id = response['sessionId']
print(f"Session {session_id} starting...")

# Wait for the session to be ready
while True:
    response = client.get_session(
        applicationId=APPLICATION_ID,
        sessionId=session_id
    )
    state = response['session']['state']
    print(f"Session state: {state}")
    if state in ('STARTED', 'IDLE'):
        break
    if state in ('FAILED', 'TERMINATED'):
        raise Exception(f"Session failed: {response['session'].get('stateDetails', 'Unknown error')}")
    time.sleep(5)

# Retrieve the Spark Connect endpoint and authentication token
response = client.get_session_endpoint(
    applicationId=APPLICATION_ID,
    sessionId=session_id
)

# Construct the authenticated remote URL
auth_token = response['authToken']
endpoint_url = response['endpoint']
connect_url = endpoint_url.replace("https://", "sc://", 1) + ":443/;use_ssl=true;"
connect_url += f"x-aws-proxy-auth={auth_token}"

# Start the Spark session
spark = SparkSession.builder.remote(connect_url).getOrCreate()
print(f"Connected. Spark version: {spark.version}")

# Run SQL
spark.sql("SELECT 1+1 AS result").show()

# Run DataFrame operations
df = spark.range(100).withColumn("squared", col("id") * col("id"))
df.show(10)
print(f"Count: {df.count()}")

# Stop the Spark session (disconnects the client only)
spark.stop()

# Terminate the EMR Serverless session to stop billing.
# spark.stop() only closes the local client connection. The remote session
# continues running and incurring charges until you explicitly terminate it
# or it reaches the idle timeout.
client.terminate_session(
    applicationId=APPLICATION_ID,
    sessionId=session_id
)
print(f"Session {session_id} terminated.")
```

要访问实时的 Spark 用户界面或 Spark 历史服务器以进行会话，请使用 `GetResourceDashboard` API。

```
response = client.get_resource_dashboard(
    applicationId=APPLICATION_ID,
    resourceId=session_id,
    resourceType='SESSION'
)
response['url']
```

当会话处于活动状态时，URL 会打开实时 Apache Spark 用户界面，用于实时监控查询、阶段和执行器。会话结束后，Spark 历史服务器仍可通过 Amazon EMR Serverless 控制台进行会话后分析。

## 注意事项和限制
<a name="spark-connect-considerations"></a>

通过 Spark Connect 运行交互式工作负载时，请考虑以下几点。
+ 亚马逊 EMR 无服务器版本及更高版本`emr-7.13.0`支持 Spark Connect。
+ 只有 Apache Spark 引擎支持 Spark Connect。
+ Spark Connect 支持 DataFrame 和 SQL APIs 进 PySpark来。不支持基 APIs 于 RDD。
+ 身份验证令牌的时间限制为 1 小时。令牌过期后，gRPC 调用失败并出现身份验证错误。调`GetSessionEndpoint`用获取新令牌并`SparkSession`使用更新的令牌创建新令牌。
+ 会话在可配置的空闲超时后结束。默认超时设置为 1 小时。
+ 默认情况下，每个会话的硬限制为 24 小时，之后即使它正在运行任务，它也会自动终止。
+ 默认情况下，每个 EMR Serverless 应用程序最多支持 25 个并发会话。要申请提高限额，请联系 Supp AWS ort。
+ 默认情况下，应用程序`autoStopConfig`处于开启状态。应用程序会在 15 分钟后自动停止，且没有活动会话或作业运行。您可以将此配置作为 `create-application` 或 `update-application` 请求的一部分进行更改。
+ 为获得最佳启动体验，请为驱动程序和执行程序配置预初始化的容量。
+ 在启动 EMR Serverless 会话之前，您应该启用 AutoStart 或手动启动应用程序。
+ 本地安装的 PySpark 版本必须与亚马逊 EMR 无服务器应用程序上的 Apache Spark 版本匹配（适用于 3.5.6）。`emr-7.13.0`版本不匹配会导致意外`ImportError`行为。
+ Spark Connect 会话不支持通过 Lake Formation 进行精细访问控制。
+ 使用 Spark Connect 的交互式会话不支持可信身份传播。
+ 使用 Spark Connect 的交互式会话不支持 EMR Serverless 上的无服务器存储。
+ 使用 Spark Connect 不收取额外费用。您只需为会话期间消耗的 EMR 无服务器计算资源（vCPU、内存和存储）付费。
+ Spark 配置`spark.connect.grpc.binding.address`由 EMR Serverless 保留，用户无法覆盖。
+ 您在本地安装的 PySpark 软件包必须与 EMR Serverless 应用程序上的 Spark 版本相匹配。版本不匹配会导致连接错误。Python UDFs (`@udf`,`spark.udf.register`) 还需要本地 Python 次要版本来匹配工作程序，否则它们就会失败`PYTHON_VERSION_MISMATCH`。内置 SQL 函数和 DataFrame操作不需要 Python 版本匹配。
+ 要使用传递 Spark 配置`start-session`，请在`--configuration-overrides`参数中将其设置`runtimeConfiguration`在下方。`start-job-run`API `applicationConfiguration` 改为使用。