使用 Amazon SageMaker 功能商店 Spark 進行 Batch 攝取 - Amazon SageMaker

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Amazon SageMaker 功能商店 Spark 進行 Batch 攝取

Amazon SageMaker 功能商店星火是星火庫連接到功能商店的星火連接器。特徵商店 Spark 簡化了從 Spark DataFrame 到特徵群組的資料擷取作業。功能存放區支援使用 Spark 的批次資料擷取、使用您現有的ETL管道EMRGIS、Amazon 上的 AWS Glue 任務、Amazon SageMaker 處理任務或 SageMaker 筆記本。

為 Python 和 Scala 開發人員提供了用於安裝和實施批次資料擷取的方法。sagemaker-feature-store-pysparkPython 開發人員可以按照 Amazon SageMaker 功能商店 Spark GitHub 存儲庫中的說明使用開源 Python 庫進行本地開發EMR,在 Amazon 上安裝和 Jupyter 筆記本電腦。斯卡拉開發人員可以使用在 Amazon 功能商店星火 SDK Maven 中央存儲庫可用的 SageMaker 功能商店星火連接器。

您可以使用 Spark 連接器以下列方式擷取資料,視線上儲存、離線儲存或兩者是否啟用而定。

  1. 預設情況下擷取 — 如果啟用了線上商店,Spark 連接器會先使用將您的資料框擷取至線上商店。PutRecordAPI只有活動時間最長的記錄保留在線儲存中。如果已啟用離線存放區,則功能儲存會在 15 分鐘內將您的資料框擷取至離線存放區。如需線上和離線儲存運作方式的詳細資訊,請參閱功能儲存概念

    您可以通過不在.ingest_data(...)方法中指定target_stores來完成此操作。

  2. 離線儲存區直接擷取 — 如果啟用離線存放區,Spark 連接器批次會將您的資料框直接擷取至離線存放區。將資料框直接導入離線儲存並不會更新線上儲存。

    您可以通過在.ingest_data(...)方法中設置 target_stores=["OfflineStore"] 來完成此操作。

  3. 僅限線上商店 — 如果啟用了線上商店,Spark 連接器會使用將您的資料框擷取至線上商店。PutRecordAPI將資料框直接導入線上儲存並不會更新離線儲存。

    您可以通過在.ingest_data(...)方法中設置 target_stores=["OnlineStore"] 來完成此操作。

如需不同擷取方法的詳細資訊,請參閱實作範例

功能儲存 Spark 安裝

Scala 使用者

功能商店星火SDK是在 Amazon SageMaker 功能商店星火 SDK Maven 中央存儲庫斯卡拉用戶可用。

需求

  • Spark >= 3.0.0 和 <= 3.3.0

  • iceberg-spark-runtime >= 0.14.0

  • Scala >= 2.12.x 

  • Amazon EMR > = 6.1.0(僅當您使用的是 Amazon)EMR

在 POM .xml 中聲明依賴關係

特徵商店 Spark 連接器具有 iceberg-spark-runtime 程式庫的相依性。因此,如果您要將資料擷取到已使用 Iceberg 資料表格式自動建立的功能群組中,則必須將相應版本的 iceberg-spark-runtime程式庫新增至相依性。例如,如果您使用的是 Spark 3.1,則必須在您的項目中聲明以下內容POM.xml

<dependency> <groupId>software.amazon.sagemaker.featurestore</groupId> <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId> <version>0.14.0</version> </dependency>

Python 使用者

功能商店星火SDK是在開源 Amazon SageMaker 功能商店星火 GitHub存儲庫中可用。

需求

  • Spark >= 3.0.0 和 <= 3.3.0

  • Amazon EMR > = 6.1.0(僅當您使用的是 Amazon)EMR

  • 核心 = conda_python3

我們建議將 $SPARK_HOME 設定為 Spark 安裝目錄。在安裝期間,功能商店會上傳必JAR要的內容SPARK_HOME,以便自動載入相依性。火花開始一JVM個需要使這個 PySpark 庫的工作。

本機安裝

若要尋找有關安裝的詳細資訊,請在以下安裝中新增 --verbose 以啟用詳細模式。

pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

在 Amazon 上安裝 EMR

使用 6.1.0 版或更新版本建立 Amazon EMR 叢集。啟用可協SSH助您疑難排解任何問題。

您可以執行下列操作之一來安裝該資料庫:

  • 在 Amazon 中創建一個自定義步驟EMR。

  • 使用 Connect 到您的叢集,SSH然後從該處安裝程式庫。

注意

下列資訊使用 Spark 3.1 版,但您可以指定符合需求的任何版本。

export SPARK_HOME=/usr/lib/spark sudo -E pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
注意

如果要JARs自動將依賴項安裝到 SPARK _HOME,請勿使用引導步驟。

在 SageMaker 筆記本執行個體上安裝

使用下列指令安裝與 Spark 連接器相容的 PySpark 版本:

!pip3 install pyspark==3.1.1 !pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

如果您要對離線存放區執行批次擷取,則相依性不在筆記本執行個體環境中。

from pyspark.sql import SparkSession import feature_store_pyspark extra_jars = ",".join(feature_store_pyspark.classpath_jars()) spark = SparkSession.builder \ .config("spark.jars", extra_jars) \ .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \ .getOrCreate()

在筆記型電腦上安裝 GIS

重要

您必須使用 2.0 或更新 AWS Glue 版本。

使用下列資訊可協助您在 AWS Glue 互動式工作階段 (GIS) 中安裝 PySpark 連接器。

Amazon SageMaker 功能商店 Spark 需要在工作階段初始化JAR期間使用特定的 Spark 連接器,才能上傳到 Amazon S3 儲存貯體。如需JAR將所需檔案上傳到 S3 儲存貯體的詳細資訊,請參閱擷取功JAR能商店星火

上傳之後JAR,您必須JAR使用下列命令提供GIS工作階段。

%extra_jars s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar

若要在 AWS Glue 執行階段中安裝功能商店 Spark,請在GIS筆記型電腦中使用 %additional_python_modules Magic 指令。 AWS Glue 執行pip至您在下指定的模組%additional_python_modules

%additional_python_modules sagemaker-feature-store-pyspark-3.1

在開始 AWS Glue 工作階段之前,您必須同時使用上述兩個魔術指令。

在 AWS Glue 工作上安裝

重要

您必須使用 2.0 或更新 AWS Glue 版本。

若要在 AWS Glue 工作上安裝 Spark 連接器,請使用--extra-jars引數提供必要的參數,JARs並在建立工作時--additional-python-modules將 Spark 連接器安裝為 AWS Glue 作業參數,如下列範例所示。如需JAR將所需檔案上傳到 S3 儲存貯體的詳細資訊,請參閱擷取功JAR能商店星火

glue_client = boto3.client('glue', region_name=region) response = glue_client.create_job( Name=pipeline_id, Description='Feature Store Compute Job', Role=glue_role_arn, ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run}, Command={ 'Name': 'glueetl', 'ScriptLocation': script_location_uri, 'PythonVersion': '3' }, DefaultArguments={ '--TempDir': temp_dir_location_uri, '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1', '--extra-jars': "s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar", ... }, MaxRetries=3, NumberOfWorkers=149, Timeout=2880, GlueVersion='3.0', WorkerType='G.2X' )

在 Amazon SageMaker 處理任務上安裝

若要將功能商店 Spark 與 Amazon SageMaker 處理任務搭配使用,請攜帶您自己的影像。有關使用映像的更多資訊,請參閱帶上自己的 SageMaker 形象。將安裝步驟新增到 Docker 文件中。將 Docker 映像推送到 Amazon ECR 儲存庫之後,您可以使用建 PySparkProcessor 立處理任務。如需使用處理 PySpark 器建立處理工作的詳細資訊,請參閱使用 Apache Spark 進行資料處理

以下是將安裝步驟新增至 Dockerfile 的範例。

FROM <ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0 RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose

擷取功JAR能商店星火

要檢索功能存儲星火依賴JAR,您必須pip在任何 Python 環境與網絡訪問使用 Python Package 索引(PyPI)存儲庫安裝星火連接器。 SageMaker Jupyter 筆記本是具有網絡訪問權限的 Python 環境的一個例子。

下列指令會安裝 Spark 連接器。

!pip install sagemaker-feature-store-pyspark-3.1

安裝功能存放區星火之後,您可以擷取JAR位置並上傳JAR到 Amazon S3。

feature-store-pyspark-dependency-jars指令提供了圖徵倉庫 Spark 加入的必要相依性JAR的位置。您可以使用該命令擷取該命令JAR並將其上傳到 Amazon S3。

jar_location = !feature-store-pyspark-dependency-jars jar_location = jar_location[0] s3_client = boto3.client("s3") s3_client.upload_file(jar_location, "<YOUR_BUCKET>","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")

實作範例

Example Python script

FeatureStoreBatchIngestion. PY

from pyspark.sql import SparkSession from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager import feature_store_pyspark spark = SparkSession.builder \ .getOrCreate() # Construct test DataFrame columns = ["RecordIdentifier", "EventTime"] data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")] df = spark.createDataFrame(data).toDF(*columns) # Initialize FeatureStoreManager with a role arn if your feature group is created by another account feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") # Load the feature definitions from input schema. The feature definitions can be used to create a feature group feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df) feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" # Ingest by default. The connector will leverage PutRecord API to ingest your data in stream # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn) # To select the target stores for ingestion, you can specify the target store as the paramter # If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"]) # If only OfflineStore is selected, the connector will batch write the data to offline store directly feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"]) # To retrieve the records failed to be ingested by spark connector failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()

使用 Python 指令碼範例提交 Spark 工作

該 PySpark 版本需要一個額外的依賴JAR進口,因此需要額外的步驟來運行 Spark 應用程序。

如果您沒有在安裝SPARK_HOME過程中指定,那麼您必須JARs在運行JVM時加載所需的spark-submitfeature-store-pyspark-dependency-jars是由星火庫安裝的 Python 腳本,以自動JARs為您獲取所有路徑。

spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py

如果您在 Amazon 上執行此應用程式EMR,建議您以用戶端模式執行應用程式,這樣您就不需要將相依項目分配JARs到其他任務節點。使用類似於以下的 Spark 參數在 Amazon EMR 集群中添加一個步驟:

spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
Example Scala script

FeatureStoreBatchIngestion. 斯卡拉

import software.amazon.sagemaker.featurestore.sparksdk.FeatureStoreManager import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} object TestSparkApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate() // Construct test DataFrame val data = List( Row("1", "2021-07-01T12:20:12Z"), Row("2", "2021-07-02T12:20:13Z"), Row("3", "2021-07-03T12:20:14Z") ) val schema = StructType( List(StructField("RecordIdentifier", StringType), StructField("EventTime", StringType)) ) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) // Initialize FeatureStoreManager with a role arn if your feature group is created by another account val featureStoreManager = new FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") // Load the feature definitions from input schema. The feature definitions can be used to create a feature group val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(df) val featureGroupArn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" // Ingest by default. The connector will leverage PutRecord API to ingest your data in stream // https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html featureStoreManager.ingestData(df, featureGroupArn) // To select the target stores for ingestion, you can specify the target store as the paramter // If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream featureStoreManager.ingestData(df, featureGroupArn, List("OfflineStore", "OnlineStore")) // If only OfflineStore is selected, the connector will batch write the data to offline store directly featureStoreManager.ingestData(df, featureGroupArn, ["OfflineStore"]) // To retrieve the records failed to be ingested by spark connector val failedRecordsDf = featureStoreManager.getFailedStreamIngestionDataFrame() } }

提交一個 Spark 工作

Scala

您應該能夠使用功能儲存 Spark 作為正常依賴關係。在所有平台上執行應用程式不需要額外的指令。