本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
透過 Spark Connect 使用 Amazon EMR Serverless 執行互動式工作階段
使用 Amazon EMR 版本 emr-7.13.0和更新版本,您可以從自我管理的 PySpark 用戶端連線至 Amazon EMR Serverless 應用程式,例如使用 EMR Serverless 工作階段 APIs 搭配 Apache Spark Connect 的 VS Code、PyCharm 和 Jupyter 筆記本。Spark Connect 使用用戶端伺服器架構,將您的應用程式程式碼與 Spark 驅動程式程序分離。您可以在本機 IDE 中開發和偵錯 PySpark 程式碼,同時在 EMR Serverless 運算上執行 Spark 操作。Spark Connect 提供下列優點:
-
從任何 PySpark 用戶端連線至 EMR Serverless,包括 VS Code、PyCharm 和 Jupyter 筆記本。
-
在 IDE 中設定中斷點並逐步完成 PySpark 程式碼,同時 DataFrames 遠端在生產規模資料上執行。
Spark Connect 工作階段是本機 PySpark 用戶端與在 Amazon EMR Serverless 上執行的 Spark 驅動程式之間的受管連線。當您啟動工作階段時,EMR Serverless 會代表您佈建 Spark 驅動程式和執行器。您的本機用戶端會將 DataFrame 和 SQL 操作傳送至驅動程式,而驅動程式會從遠端執行這些操作。工作階段會持續存在,直到您將其終止或達到閒置逾時為止,因此您可以以互動方式執行多個查詢,而無需重新啟動 Spark。每個工作階段都有自己的端點 URL 和身分驗證字符,您可用來連線。
所需的許可
除了存取 Amazon EMR Serverless 所需的許可之外,也請將下列許可新增至您的 IAM 角色,以存取 Spark Connect 端點和管理 Spark Connect 工作階段:
emr-serverless:StartSession-
准許在您指定為 的應用程式上建立 Spark Connect 工作階段
Resource。 emr-serverless:GetSessionEndpoint-
准許擷取工作階段的 Spark Connect 端點 URL 和身分驗證字符。
emr-serverless:GetSession-
准許取得工作階段的狀態。
emr-serverless:ListSessions-
准許列出應用程式上的工作階段。
emr-serverless:TerminateSession-
准許終止工作階段。
iam:PassRole-
准許在建立 Spark Connect 工作階段時存取 IAM 執行角色。Amazon EMR Serverless 使用此角色來執行您的工作負載。
emr-serverless:GetResourceDashboard-
准許產生 Spark UI URL,並提供工作階段日誌的存取權。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "EMRServerlessApplicationLevelAccess", "Effect": "Allow", "Action": [ "emr-serverless:StartSession", "emr-serverless:ListSessions" ], "Resource": [ "arn:aws:emr-serverless:region:account-id:/applications/application-id" ] }, { "Sid": "EMRServerlessSessionLevelAccess", "Effect": "Allow", "Action": [ "emr-serverless:GetSession", "emr-serverless:GetSessionEndpoint", "emr-serverless:TerminateSession", "emr-serverless:GetResourceDashboard" ], "Resource": [ "arn:aws:emr-serverless:region:account-id:/applications/application-id/sessions/*" ] }, { "Sid": "EMRServerlessRuntimeRoleAccess", "Effect": "Allow", "Action": [ "iam:PassRole" ], "Resource": [ "arn:aws:iam::account-id:role/EMRServerlessExecutionRole" ], "Condition": { "StringLike": { "iam:PassedToService": "emr-serverless.amazonaws.com" } } } ] }
使用互動式工作階段
若要建立已啟用 Spark Connect 的應用程式並與其連線,請遵循下列步驟。
啟動 Spark Connect 工作階段
-
使用 Spark Connect 工作階段建立應用程式。
aws emr-serverless create-application \ --type "SPARK" \ --name "spark-connect-app" \ --release-label emr-7.13.0 \ --interactive-configuration '{"sessionEnabled": true}' -
Amazon EMR Serverless 建立應用程式後,如果您尚未啟用自動啟動以接受 Spark Connect 工作階段,請啟動應用程式。
aws emr-serverless start-application \ --application-idAPPLICATION_ID -
使用下列命令來檢查應用程式的狀態。狀態變為 後
STARTED,啟動工作階段。aws emr-serverless get-application \ --application-idAPPLICATION_ID -
使用授予資料存取權的 IAM 執行角色啟動工作階段。
aws emr-serverless start-session \ --application-idAPPLICATION_ID\ --execution-role-arn arn:aws:iam::account-id:role/EMRServerlessExecutionRole -
使用
get-sessionAPI 監控工作階段狀態,並等待工作階段處於STARTED或IDLE狀態。aws emr-serverless get-session \ --application-idAPPLICATION_ID\ --session-idSESSION_ID -
擷取 Spark Connect 端點和身分驗證字符。傳回的端點 URL
GetSessionEndpoint不包含連接埠號碼。建構sc://連線 URL 時,您必須附加:443- 例如sc://hostname:443/;use_ssl=true;x-aws-proxy-auth=token。如果沒有它,PySpark 用戶端預設為連接埠 15002,無法在 EMR Serverless 上連線。aws emr-serverless get-session-endpoint \ --application-idAPPLICATION_ID\ --session-idSESSION_ID回應包含端點 URL 和身分驗證字符:
{ "endpoint": "ENDPOINT_URL", "authToken": "AUTH_TOKEN", "authTokenExpiresAt": "AUTH_TOKEN_EXPIRY_TIME" } -
端點就緒後,從 PySpark 用戶端連線。在 EMR Serverless 應用程式上安裝符合 Spark 版本的 PySpark 用戶端,以及適用於 Python 的 AWS SDK。
# Match the PySpark version to your EMR Serverless release version (3.5.6 for emr-7.13.0) pip install pyspark[connect]==3.5.6 pip install boto3
以下是啟動工作階段並直接將請求傳送至工作階段端點的範例 Python 指令碼:
import boto3 import time from pyspark.sql import SparkSession from pyspark.sql.functions import col client = boto3.client('emr-serverless', region_name='REGION') APPLICATION_ID = 'APPLICATION_ID' EXECUTION_ROLE = 'arn:aws:iam::account-id:role/EMRServerlessExecutionRole' # Start the session response = client.start_session( applicationId=APPLICATION_ID, executionRoleArn=EXECUTION_ROLE ) session_id = response['sessionId'] print(f"Session {session_id} starting...") # Wait for the session to be ready while True: response = client.get_session( applicationId=APPLICATION_ID, sessionId=session_id ) state = response['session']['state'] print(f"Session state: {state}") if state in ('STARTED', 'IDLE'): break if state in ('FAILED', 'TERMINATED'): raise Exception(f"Session failed: {response['session'].get('stateDetails', 'Unknown error')}") time.sleep(5) # Retrieve the Spark Connect endpoint and authentication token response = client.get_session_endpoint( applicationId=APPLICATION_ID, sessionId=session_id ) # Construct the authenticated remote URL auth_token = response['authToken'] endpoint_url = response['endpoint'] connect_url = endpoint_url.replace("https://", "sc://", 1) + ":443/;use_ssl=true;" connect_url += f"x-aws-proxy-auth={auth_token}" # Start the Spark session spark = SparkSession.builder.remote(connect_url).getOrCreate() print(f"Connected. Spark version: {spark.version}") # Run SQL spark.sql("SELECT 1+1 AS result").show() # Run DataFrame operations df = spark.range(100).withColumn("squared", col("id") * col("id")) df.show(10) print(f"Count: {df.count()}") # Stop the Spark session (disconnects the client only) spark.stop() # Terminate the EMR Serverless session to stop billing. # spark.stop() only closes the local client connection. The remote session # continues running and incurring charges until you explicitly terminate it # or it reaches the idle timeout. client.terminate_session( applicationId=APPLICATION_ID, sessionId=session_id ) print(f"Session {session_id} terminated.")
若要存取工作階段的即時 Spark UI 或 Spark 歷史記錄伺服器,請使用 GetResourceDashboard API。
response = client.get_resource_dashboard( applicationId=APPLICATION_ID, resourceId=session_id, resourceType='SESSION' ) response['url']
當工作階段處於作用中狀態時,URL 會開啟即時 Apache Spark UI,以即時監控查詢、階段和執行器。工作階段結束後,Spark 歷史記錄伺服器仍可透過 Amazon EMR Serverless 主控台進行工作階段後分析。
考量和限制
透過 Spark Connect 執行互動式工作負載時,請考慮下列事項。
-
Amazon EMR Serverless 版本
emr-7.13.0和更新版本支援 Spark Connect。 -
只有 Apache Spark 引擎支援 Spark Connect。
-
Spark Connect 支援 PySpark 中的 DataFrame 和 SQL APIs。不支援 RDD 型 APIs。
-
身分驗證字符的時間限制為 1 小時。當字符過期時,gRPC 呼叫會失敗並出現身分驗證錯誤。呼叫
GetSessionEndpoint以取得新的字符,並使用SparkSession更新的字符建立新的字符。 -
工作階段會在可設定的閒置逾時後結束。預設逾時設定為 1 小時。
-
根據預設,每個工作階段的硬性限制為 24 小時,之後即使它正在主動執行任務,也會自動終止。
-
根據預設,每個 EMR Serverless 應用程式最多支援 25 個並行工作階段。若要請求提高限制,請聯絡 AWS Support。
-
根據預設,
autoStopConfig會針對應用程式開啟 。應用程式會在 15 分鐘後自動停止,沒有作用中的工作階段或任務執行。您可以在create-application或update-application請求中變更此組態。 -
為了獲得最佳的啟動體驗,請為驅動程式和執行器設定預先初始化的容量。
-
在啟動 EMR Serverless 工作階段之前,您應該啟用 AutoStart 或手動啟動應用程式。
-
本機安裝的 PySpark 版本必須與 Amazon EMR Serverless 應用程式上的 Apache Spark 版本相符 (適用於 3.5.6
emr-7.13.0)。版本不相符會導致ImportError或未預期的行為。 -
Spark Connect 工作階段不支援透過 Lake Formation 進行精細存取控制。
-
使用 Spark Connect 的互動式工作階段不支援受信任身分傳播。
-
使用 Spark Connect 的互動式工作階段不支援 EMR Serverless 上的無伺服器儲存。
-
使用 Spark Connect 無需額外費用。您只需為工作階段期間消耗的 EMR Serverless 運算資源 (vCPU、記憶體和儲存) 付費。
-
Spark 組態由 EMR Serverless
spark.connect.grpc.binding.address保留,使用者無法覆寫。 -
您在本機安裝的 PySpark 套件必須符合 EMR Serverless 應用程式的 Spark 版本。版本不相符會導致連線錯誤。Python UDFs(
@udf、spark.udf.register) 也需要本機 Python 次要版本以符合工作者,或使用 失敗PYTHON_VERSION_MISMATCH。內建 SQL 函數和 DataFrame 操作不需要 Python 版本比對。 -
若要使用 傳遞 Spark 組態
start-session,請在--configuration-overrides參數runtimeConfiguration中的 下進行設定。start-job-runAPI 會applicationConfiguration改用 。