Batch-Aufnahme mit Amazon SageMaker Feature Store Spark - Amazon SageMaker

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Batch-Aufnahme mit Amazon SageMaker Feature Store Spark

Amazon SageMaker Feature Store Spark ist ein Spark-Konnektor, der die Spark-Bibliothek mit dem Feature Store verbindet. Feature Store Spark vereinfacht die Datenaufnahme von Spark DataFrames zu Feature-Gruppen. Feature Store unterstützt die Batch-Datenerfassung mit Spark unter Verwendung Ihrer vorhandenen ETL Pipeline, eines AWS Glue Jobs EMRGIS, eines Amazon SageMaker Processing-Jobs oder eines SageMaker Notebooks.

Methoden zur Installation und Implementierung der Batch-Datenaufnahme werden für Python- und Scala-Entwickler bereitgestellt. Python-Entwickler können die sagemaker-feature-store-pyspark Open-Source-Python-Bibliothek für die lokale Entwicklung, die Installation auf Amazon und für Jupyter Notebooks verwendenEMR, indem sie den Anweisungen im Amazon SageMaker Feature Store Spark-Repository folgen. GitHub Scala-Entwickler können den Feature Store Spark-Konnektor verwenden, der im zentralen Amazon SageMaker Feature Store Spark SDK Maven-Repository verfügbar ist.

Sie können den Spark-Konnektor verwenden, um Daten auf folgende Weise aufzunehmen, je nachdem, ob der Online-Speicher, der Offline-Speicher oder beide aktiviert sind.

  1. Standardmäßig aufnehmen — Wenn der Online-Shop aktiviert ist, nimmt der Spark-Connector zuerst Ihren Datenrahmen mithilfe von in den Online-Shop auf. PutRecordAPI Nur der Datensatz mit der größten Eventzeit verbleibt im Online-Speicher. Wenn der Offline-Speicher aktiviert ist, nimmt Feature Store Ihren Datenframe innerhalb von 15 Minuten in den Offline-Speicher auf. Weitere Informationen zur Funktionsweise von Online- und Offline-Speichers finden Sie unter Feature Store-Konzepte.

    Sie können dies erreichen, indem Sie target_stores in der .ingest_data(...) Methode nichts angeben.

  2. Direkte Aufnahme im Offline-Speicher – Wenn der Offline-Speicher aktiviert ist, nimmt der Spark-Connector-Batch Ihren Datenrahmen direkt in den Offline-Speicher auf. Durch die direkte Aufnahme des Datenrahmens in den Offline-Speicher wird der Online-Speicher nicht aktualisiert.

    Sie können dies erreichen, indem Sie die Methode target_stores=["OfflineStore"] oder .ingest_data(...) festlegen.

  3. Nur Online-Shop — Wenn der Online-Shop aktiviert ist, nimmt der Spark-Connector Ihren Datenrahmen mithilfe von in den Online-Shop auf. PutRecordAPI Durch die direkte Aufnahme des Datenrahmens in den Online-Speicher wird der Offline-Speicher nicht aktualisiert.

    Sie können dies erreichen, indem Sie die Methode target_stores=["OnlineStore"] oder .ingest_data(...) festlegen.

Weitere Informationen zu den verschiedenen Startmethoden finden Sie unter Beispielimplementierungen.

Installation von Feature Store Spark

Scala-Benutzer

Der Feature Store Spark SDK ist im zentralen Amazon SageMaker Feature Store Spark SDK Maven Repository für Scala-Benutzer verfügbar.

Voraussetzungen

  • Spark >= 3.0.0 und <= 3.3.0

  • iceberg-spark-runtime>= 0.14.0

  • Skala >= 2.12.x 

  • Amazon EMR >= 6.1.0 (nur wenn Sie Amazon verwenden) EMR

Deklarieren Sie die Abhängigkeit in .xml POM

Der Feature Store Spark-Konnektor ist von der iceberg-spark-runtime Bibliothek abhängig. Sie müssen daher die entsprechende Version der iceberg-spark-runtime Bibliothek zur Abhängigkeit hinzufügen, wenn Sie Daten in eine Feature-Gruppe aufnehmen, die Sie automatisch mit dem Iceberg-Tabellenformat erstellt haben. Wenn Sie beispielsweise Spark 3.1 verwenden, müssen Sie Folgendes in Ihrem Projekt deklarieren POM.xml:

<dependency> <groupId>software.amazon.sagemaker.featurestore</groupId> <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId> <version>0.14.0</version> </dependency>

Python-Benutzer

Der Feature Store Spark SDK ist im Open-Source-Amazon SageMaker Feature Store GitHub Spark-Repository verfügbar.

Voraussetzungen

  • Spark >= 3.0.0 und <= 3.3.0

  • Amazon EMR >= 6.1.0 (nur wenn Sie Amazon verwenden) EMR

  • Kernel = conda_python3

Wir empfehlen, das $SPARK_HOME auf das Verzeichnis einzustellen, in dem Sie Spark installiert haben. Während der Installation lädt Feature Store die erforderlichen JAR Dateien hochSPARK_HOME, sodass die Abhängigkeiten automatisch geladen werden. Damit diese PySpark Bibliothek funktioniert, JVM ist der Start von Spark erforderlich.

Lokale Installation

Um weitere Informationen zur Installation zu erhalten, aktivieren Sie den ausführlichen Modus, indem Sie --verbose den folgenden Installationsbefehl anhängen.

pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

Installation bei Amazon EMR

Erstellen Sie einen EMR Amazon-Cluster mit der Release-Version 6.1.0 oder höher. AktiviertSSH, um Ihnen bei der Behebung von Problemen zu helfen.

Sie können die Bibliothek für Folgendes verwenden:

  • Erstellen Sie einen benutzerdefinierten Schritt in AmazonEMR.

  • Stellen Sie mithilfe der Bibliothek Connect zu Ihrem Cluster her SSH und installieren Sie sie von dort aus.

Anmerkung

In den folgenden Informationen wird Spark Version 3.1 verwendet, Sie können jedoch jede Version angeben, die die Anforderungen erfüllt.

export SPARK_HOME=/usr/lib/spark sudo -E pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
Anmerkung

Wenn Sie das abhängige Objekt JARs automatisch auf SPARK _ installieren möchtenHOME, verwenden Sie nicht den Bootstrap-Schritt.

Installation auf einer SageMaker Notebook-Instanz

Installieren Sie mit den folgenden Befehlen eine Version davon PySpark , die mit dem Spark-Connector kompatibel ist:

!pip3 install pyspark==3.1.1 !pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

Wenn Sie eine Batch-Aufnahme in den Offline-Speicher durchführen, befinden sich die Abhängigkeiten nicht in der Notebook-Instanceumgebung.

from pyspark.sql import SparkSession import feature_store_pyspark extra_jars = ",".join(feature_store_pyspark.classpath_jars()) spark = SparkSession.builder \ .config("spark.jars", extra_jars) \ .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \ .getOrCreate()

Installation auf Notebooks mit GIS

Wichtig

Sie müssen AWS Glue Version 2.0 oder höher verwenden.

Verwenden Sie die folgenden Informationen, um den PySpark Connector in einer AWS Glue interaktiven Sitzung (GIS) zu installieren.

Amazon SageMaker Feature Store Spark benötigt JAR während der Initialisierung der Sitzung einen bestimmten Spark-Connector, der in Ihren Amazon S3 S3-Bucket hochgeladen werden muss. Weitere Informationen zum Hochladen der erforderlichen JAR Daten in Ihren S3-Bucket finden Sie unter. Der Spark JAR für den Feature Store wird abgerufen

Nachdem Sie das hochgeladen habenJAR, müssen Sie die GIS Sitzungen JAR mithilfe des folgenden Befehls mit dem folgenden Befehl versorgen.

%extra_jars s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar

Um Feature Store Spark in der AWS Glue Runtime zu installieren, verwenden Sie den %additional_python_modules magischen Befehl im GIS Notizbuch. AWS Glue läuft pip zu den Modulen, die Sie unter angegeben haben%additional_python_modules.

%additional_python_modules sagemaker-feature-store-pyspark-3.1

Bevor Sie die AWS Glue Sitzung starten, müssen Sie die beiden vorherigen magischen Befehle verwenden.

Installation bei einem AWS Glue Job

Wichtig

Sie müssen AWS Glue Version 2.0 oder höher verwenden.

Um den Spark-Konnektor für einen AWS Glue Job zu installieren, verwenden Sie das --extra-jars Argument, um die erforderlichen JARs Daten bereitzustellen und den Spark-Connector als Job-Parameter --additional-python-modules zu installieren, wenn Sie den AWS Glue Job erstellen, wie im folgenden Beispiel gezeigt. Weitere Informationen zum Hochladen der erforderlichen JAR Dateien in Ihren S3-Bucket finden Sie unterDer Spark JAR für den Feature Store wird abgerufen.

glue_client = boto3.client('glue', region_name=region) response = glue_client.create_job( Name=pipeline_id, Description='Feature Store Compute Job', Role=glue_role_arn, ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run}, Command={ 'Name': 'glueetl', 'ScriptLocation': script_location_uri, 'PythonVersion': '3' }, DefaultArguments={ '--TempDir': temp_dir_location_uri, '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1', '--extra-jars': "s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar", ... }, MaxRetries=3, NumberOfWorkers=149, Timeout=2880, GlueVersion='3.0', WorkerType='G.2X' )

Installation bei einem Amazon SageMaker Processing-Job

Um Feature Store Spark mit Amazon SageMaker Processing Jobs zu verwenden, bringen Sie Ihr eigenes Bild mit. Weitere Informationen zum Laden eigener Daten finden Sie unter Bringen Sie Ihr eigenes SageMaker Bild mit. Fügen Sie den Installationsschritt zu einer Docker-Datei hinzu. Nachdem Sie das Docker-Image in ein ECR Amazon-Repository übertragen haben, können Sie das verwenden, PySparkProcessor um den Verarbeitungsjob zu erstellen. Weitere Informationen zum Erstellen eines Verarbeitungsauftrags mit dem PySpark Prozessor finden Sie unterDatenverarbeitung mit Apache Spark.

Im Folgenden finden Sie ein Beispiel für das Hinzufügen eines Installationsschritts zur Dockerfile.

FROM <ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0 RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose

Der Spark JAR für den Feature Store wird abgerufen

Um die Feature Store Spark-Abhängigkeit abzurufenJAR, müssen Sie den Spark-Konnektor aus dem Python Package Index (PyPI) -Repository installieren, indem Sie ihn pip in einer beliebigen Python-Umgebung mit Netzwerkzugriff verwenden. Ein SageMaker Jupyter Notebook ist ein Beispiel für eine Python-Umgebung mit Netzwerkzugriff.

Der folgende Befehl installiert den Spark-Connector.

!pip install sagemaker-feature-store-pyspark-3.1

Nachdem Sie Feature Store Spark installiert haben, können Sie den JAR Standort abrufen und JAR auf Amazon S3 hochladen.

Der feature-store-pyspark-dependency-jars Befehl gibt den Speicherort der erforderlichen Abhängigkeit anJAR, die Feature Store Spark hinzugefügt hat. Sie können den Befehl verwenden, um das abzurufen JAR und auf Amazon S3 hochzuladen.

jar_location = !feature-store-pyspark-dependency-jars jar_location = jar_location[0] s3_client = boto3.client("s3") s3_client.upload_file(jar_location, "<YOUR_BUCKET>","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")

Beispielimplementierungen

Example Python script

FeatureStoreBatchIngestion.py

from pyspark.sql import SparkSession from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager import feature_store_pyspark spark = SparkSession.builder \ .getOrCreate() # Construct test DataFrame columns = ["RecordIdentifier", "EventTime"] data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")] df = spark.createDataFrame(data).toDF(*columns) # Initialize FeatureStoreManager with a role arn if your feature group is created by another account feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") # Load the feature definitions from input schema. The feature definitions can be used to create a feature group feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df) feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" # Ingest by default. The connector will leverage PutRecord API to ingest your data in stream # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn) # To select the target stores for ingestion, you can specify the target store as the paramter # If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"]) # If only OfflineStore is selected, the connector will batch write the data to offline store directly feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"]) # To retrieve the records failed to be ingested by spark connector failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()

Reichen Sie einen Spark-Job mit einem Python-Beispielskript ein

Für die PySpark Version muss ein zusätzliches abhängiges JAR Objekt importiert werden, sodass zusätzliche Schritte erforderlich sind, um die Spark-Anwendung auszuführen.

Wenn Sie dies SPARK_HOME bei der Installation nicht angegeben haben, müssen Sie JARs JVM bei der Ausführung die erforderlichen Daten ladenspark-submit. feature-store-pyspark-dependency-jarsist ein Python-Skript, das von der Spark-Bibliothek installiert wird, um den Pfad zu allen automatisch JARs für Sie abzurufen.

spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py

Wenn Sie diese Anwendung auf Amazon ausführenEMR, empfehlen wir, die Anwendung im Client-Modus auszuführen, sodass Sie die abhängigen JARs Anwendungen nicht auf andere Taskknoten verteilen müssen. Fügen Sie einen weiteren Schritt im EMR Amazon-Cluster mit einem Spark-Argument hinzu, das dem folgenden ähnelt:

spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
Example Scala script

FeatureStoreBatchIngestion.scala

import software.amazon.sagemaker.featurestore.sparksdk.FeatureStoreManager import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} object TestSparkApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate() // Construct test DataFrame val data = List( Row("1", "2021-07-01T12:20:12Z"), Row("2", "2021-07-02T12:20:13Z"), Row("3", "2021-07-03T12:20:14Z") ) val schema = StructType( List(StructField("RecordIdentifier", StringType), StructField("EventTime", StringType)) ) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) // Initialize FeatureStoreManager with a role arn if your feature group is created by another account val featureStoreManager = new FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") // Load the feature definitions from input schema. The feature definitions can be used to create a feature group val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(df) val featureGroupArn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" // Ingest by default. The connector will leverage PutRecord API to ingest your data in stream // https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html featureStoreManager.ingestData(df, featureGroupArn) // To select the target stores for ingestion, you can specify the target store as the paramter // If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream featureStoreManager.ingestData(df, featureGroupArn, List("OfflineStore", "OnlineStore")) // If only OfflineStore is selected, the connector will batch write the data to offline store directly featureStoreManager.ingestData(df, featureGroupArn, ["OfflineStore"]) // To retrieve the records failed to be ingested by spark connector val failedRecordsDf = featureStoreManager.getFailedStreamIngestionDataFrame() } }

Reichen Sie einen Spark-Job ein

Scala

Sie sollten Feature Store Spark als normale Abhängigkeit verwenden können. Es sind keine zusätzlichen Anweisungen erforderlich, um die Anwendung auf allen Plattformen auszuführen.