View a markdown version of this page

透過 Spark Connect 使用 Amazon EMR Serverless 執行互動式工作階段 - Amazon EMR

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

透過 Spark Connect 使用 Amazon EMR Serverless 執行互動式工作階段

使用 Amazon EMR 版本 emr-7.13.0和更新版本,您可以從自我管理的 PySpark 用戶端連線至 Amazon EMR Serverless 應用程式,例如使用 EMR Serverless 工作階段 APIs 搭配 Apache Spark Connect 的 VS Code、PyCharm 和 Jupyter 筆記本。Spark Connect 使用用戶端伺服器架構,將您的應用程式程式碼與 Spark 驅動程式程序分離。您可以在本機 IDE 中開發和偵錯 PySpark 程式碼,同時在 EMR Serverless 運算上執行 Spark 操作。Spark Connect 提供下列優點:

  • 從任何 PySpark 用戶端連線至 EMR Serverless,包括 VS Code、PyCharm 和 Jupyter 筆記本。

  • 在 IDE 中設定中斷點並逐步完成 PySpark 程式碼,同時 DataFrames 遠端在生產規模資料上執行。

Spark Connect 工作階段是本機 PySpark 用戶端與在 Amazon EMR Serverless 上執行的 Spark 驅動程式之間的受管連線。當您啟動工作階段時,EMR Serverless 會代表您佈建 Spark 驅動程式和執行器。您的本機用戶端會將 DataFrame 和 SQL 操作傳送至驅動程式,而驅動程式會從遠端執行這些操作。工作階段會持續存在,直到您將其終止或達到閒置逾時為止,因此您可以以互動方式執行多個查詢,而無需重新啟動 Spark。每個工作階段都有自己的端點 URL 和身分驗證字符,您可用來連線。

所需的許可

除了存取 Amazon EMR Serverless 所需的許可之外,也請將下列許可新增至您的 IAM 角色,以存取 Spark Connect 端點和管理 Spark Connect 工作階段:

emr-serverless:StartSession

准許在您指定為 的應用程式上建立 Spark Connect 工作階段Resource

emr-serverless:GetSessionEndpoint

准許擷取工作階段的 Spark Connect 端點 URL 和身分驗證字符。

emr-serverless:GetSession

准許取得工作階段的狀態。

emr-serverless:ListSessions

准許列出應用程式上的工作階段。

emr-serverless:TerminateSession

准許終止工作階段。

iam:PassRole

准許在建立 Spark Connect 工作階段時存取 IAM 執行角色。Amazon EMR Serverless 使用此角色來執行您的工作負載。

emr-serverless:GetResourceDashboard

准許產生 Spark UI URL,並提供工作階段日誌的存取權。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "EMRServerlessApplicationLevelAccess", "Effect": "Allow", "Action": [ "emr-serverless:StartSession", "emr-serverless:ListSessions" ], "Resource": [ "arn:aws:emr-serverless:region:account-id:/applications/application-id" ] }, { "Sid": "EMRServerlessSessionLevelAccess", "Effect": "Allow", "Action": [ "emr-serverless:GetSession", "emr-serverless:GetSessionEndpoint", "emr-serverless:TerminateSession", "emr-serverless:GetResourceDashboard" ], "Resource": [ "arn:aws:emr-serverless:region:account-id:/applications/application-id/sessions/*" ] }, { "Sid": "EMRServerlessRuntimeRoleAccess", "Effect": "Allow", "Action": [ "iam:PassRole" ], "Resource": [ "arn:aws:iam::account-id:role/EMRServerlessExecutionRole" ], "Condition": { "StringLike": { "iam:PassedToService": "emr-serverless.amazonaws.com" } } } ] }

使用互動式工作階段

若要建立已啟用 Spark Connect 的應用程式並與其連線,請遵循下列步驟。

啟動 Spark Connect 工作階段
  1. 使用 Spark Connect 工作階段建立應用程式。

    aws emr-serverless create-application \ --type "SPARK" \ --name "spark-connect-app" \ --release-label emr-7.13.0 \ --interactive-configuration '{"sessionEnabled": true}'
  2. Amazon EMR Serverless 建立應用程式後,如果您尚未啟用自動啟動以接受 Spark Connect 工作階段,請啟動應用程式。

    aws emr-serverless start-application \ --application-id APPLICATION_ID
  3. 使用下列命令來檢查應用程式的狀態。狀態變為 後STARTED,啟動工作階段。

    aws emr-serverless get-application \ --application-id APPLICATION_ID
  4. 使用授予資料存取權的 IAM 執行角色啟動工作階段。

    aws emr-serverless start-session \ --application-id APPLICATION_ID \ --execution-role-arn arn:aws:iam::account-id:role/EMRServerlessExecutionRole
  5. 使用 get-session API 監控工作階段狀態,並等待工作階段處於 STARTEDIDLE 狀態。

    aws emr-serverless get-session \ --application-id APPLICATION_ID \ --session-id SESSION_ID
  6. 擷取 Spark Connect 端點和身分驗證字符。傳回的端點 URL GetSessionEndpoint 不包含連接埠號碼。建構sc://連線 URL 時,您必須附加 :443 - 例如 sc://hostname:443/;use_ssl=true;x-aws-proxy-auth=token。如果沒有它,PySpark 用戶端預設為連接埠 15002,無法在 EMR Serverless 上連線。

    aws emr-serverless get-session-endpoint \ --application-id APPLICATION_ID \ --session-id SESSION_ID

    回應包含端點 URL 和身分驗證字符:

    { "endpoint": "ENDPOINT_URL", "authToken": "AUTH_TOKEN", "authTokenExpiresAt": "AUTH_TOKEN_EXPIRY_TIME" }
  7. 端點就緒後,從 PySpark 用戶端連線。在 EMR Serverless 應用程式上安裝符合 Spark 版本的 PySpark 用戶端,以及適用於 Python 的 AWS SDK。

    # Match the PySpark version to your EMR Serverless release version (3.5.6 for emr-7.13.0) pip install pyspark[connect]==3.5.6 pip install boto3

以下是啟動工作階段並直接將請求傳送至工作階段端點的範例 Python 指令碼:

import boto3 import time from pyspark.sql import SparkSession from pyspark.sql.functions import col client = boto3.client('emr-serverless', region_name='REGION') APPLICATION_ID = 'APPLICATION_ID' EXECUTION_ROLE = 'arn:aws:iam::account-id:role/EMRServerlessExecutionRole' # Start the session response = client.start_session( applicationId=APPLICATION_ID, executionRoleArn=EXECUTION_ROLE ) session_id = response['sessionId'] print(f"Session {session_id} starting...") # Wait for the session to be ready while True: response = client.get_session( applicationId=APPLICATION_ID, sessionId=session_id ) state = response['session']['state'] print(f"Session state: {state}") if state in ('STARTED', 'IDLE'): break if state in ('FAILED', 'TERMINATED'): raise Exception(f"Session failed: {response['session'].get('stateDetails', 'Unknown error')}") time.sleep(5) # Retrieve the Spark Connect endpoint and authentication token response = client.get_session_endpoint( applicationId=APPLICATION_ID, sessionId=session_id ) # Construct the authenticated remote URL auth_token = response['authToken'] endpoint_url = response['endpoint'] connect_url = endpoint_url.replace("https://", "sc://", 1) + ":443/;use_ssl=true;" connect_url += f"x-aws-proxy-auth={auth_token}" # Start the Spark session spark = SparkSession.builder.remote(connect_url).getOrCreate() print(f"Connected. Spark version: {spark.version}") # Run SQL spark.sql("SELECT 1+1 AS result").show() # Run DataFrame operations df = spark.range(100).withColumn("squared", col("id") * col("id")) df.show(10) print(f"Count: {df.count()}") # Stop the Spark session (disconnects the client only) spark.stop() # Terminate the EMR Serverless session to stop billing. # spark.stop() only closes the local client connection. The remote session # continues running and incurring charges until you explicitly terminate it # or it reaches the idle timeout. client.terminate_session( applicationId=APPLICATION_ID, sessionId=session_id ) print(f"Session {session_id} terminated.")

若要存取工作階段的即時 Spark UI 或 Spark 歷史記錄伺服器,請使用 GetResourceDashboard API。

response = client.get_resource_dashboard( applicationId=APPLICATION_ID, resourceId=session_id, resourceType='SESSION' ) response['url']

當工作階段處於作用中狀態時,URL 會開啟即時 Apache Spark UI,以即時監控查詢、階段和執行器。工作階段結束後,Spark 歷史記錄伺服器仍可透過 Amazon EMR Serverless 主控台進行工作階段後分析。

考量和限制

透過 Spark Connect 執行互動式工作負載時,請考慮下列事項。

  • Amazon EMR Serverless 版本 emr-7.13.0和更新版本支援 Spark Connect。

  • 只有 Apache Spark 引擎支援 Spark Connect。

  • Spark Connect 支援 PySpark 中的 DataFrame 和 SQL APIs。不支援 RDD 型 APIs。

  • 身分驗證字符的時間限制為 1 小時。當字符過期時,gRPC 呼叫會失敗並出現身分驗證錯誤。呼叫 GetSessionEndpoint 以取得新的字符,並使用SparkSession更新的字符建立新的字符。

  • 工作階段會在可設定的閒置逾時後結束。預設逾時設定為 1 小時。

  • 根據預設,每個工作階段的硬性限制為 24 小時,之後即使它正在主動執行任務,也會自動終止。

  • 根據預設,每個 EMR Serverless 應用程式最多支援 25 個並行工作階段。若要請求提高限制,請聯絡 AWS Support。

  • 根據預設, autoStopConfig 會針對應用程式開啟 。應用程式會在 15 分鐘後自動停止,沒有作用中的工作階段或任務執行。您可以在 create-applicationupdate-application請求中變更此組態。

  • 為了獲得最佳的啟動體驗,請為驅動程式和執行器設定預先初始化的容量。

  • 在啟動 EMR Serverless 工作階段之前,您應該啟用 AutoStart 或手動啟動應用程式。

  • 本機安裝的 PySpark 版本必須與 Amazon EMR Serverless 應用程式上的 Apache Spark 版本相符 (適用於 3.5.6emr-7.13.0)。版本不相符會導致ImportError或未預期的行為。

  • Spark Connect 工作階段不支援透過 Lake Formation 進行精細存取控制。

  • 使用 Spark Connect 的互動式工作階段不支援受信任身分傳播。

  • 使用 Spark Connect 的互動式工作階段不支援 EMR Serverless 上的無伺服器儲存。

  • 使用 Spark Connect 無需額外費用。您只需為工作階段期間消耗的 EMR Serverless 運算資源 (vCPU、記憶體和儲存) 付費。

  • Spark 組態由 EMR Serverless spark.connect.grpc.binding.address保留,使用者無法覆寫。

  • 您在本機安裝的 PySpark 套件必須符合 EMR Serverless 應用程式的 Spark 版本。版本不相符會導致連線錯誤。Python UDFs(@udfspark.udf.register) 也需要本機 Python 次要版本以符合工作者,或使用 失敗PYTHON_VERSION_MISMATCH。內建 SQL 函數和 DataFrame 操作不需要 Python 版本比對。

  • 若要使用 傳遞 Spark 組態start-session,請在 --configuration-overrides 參數runtimeConfiguration中的 下進行設定。start-job-run API 會applicationConfiguration改用 。