Ingestion par lots avec Amazon SageMaker Feature Store Spark - Amazon SageMaker

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Ingestion par lots avec Amazon SageMaker Feature Store Spark

Amazon SageMaker Feature Store Spark est un connecteur Spark qui connecte la bibliothèque Spark au Feature Store. Feature Store Spark simplifie l'ingestion de données depuis des DataFrames Spark vers des groupes de fonctionnalités. Feature Store prend en charge l'ingestion de données par lots avec Spark, en utilisant votre ETL pipeline existantEMR, sur AmazonGIS, une AWS Glue tâche, une tâche Amazon SageMaker Processing ou un SageMaker bloc-notes.

Des méthodes d'installation et d'implémentation de l'ingestion de lots de données sont fournies pour les développeurs Python et Scala. Les développeurs Python peuvent utiliser la bibliothèque sagemaker-feature-store-pyspark Python open source pour le développement local, l'installation sur Amazon EMR et pour les blocs-notes Jupyter en suivant les instructions du référentiel Amazon SageMaker Feature Store Spark. GitHub Les développeurs Scala peuvent utiliser le connecteur Feature Store Spark disponible dans le référentiel central Amazon SageMaker Feature Store Spark SDK Maven.

Vous pouvez utiliser le connecteur Spark pour ingérer des données des manières suivantes, selon que le magasin en ligne, le magasin hors connexion ou les deux sont activés.

  1. Ingérer par défaut : si la boutique en ligne est activée, le connecteur Spark ingère d'abord votre trame de données dans la boutique en ligne à l'aide du. PutRecordAPI Seul l'enregistrement avec l'heure d'événement la plus élevée reste dans le magasin en ligne. Si le magasin hors connexion est activé, Feature Store ingère en moins de 15 minutes votre bloc de données dans le magasin hors connexion. Pour plus d'informations sur le fonctionnement des magasins en ligne et hors ligne, consultez Concepts liés à Feature Store.

    Vous pouvez accomplir ceci en ne spécifiant pas target_stores dans la méthode .ingest_data(...).

  2. Ingestion directe dans le magasin hors connexion : si le magasin hors connexion est activé, le connecteur Spark ingère par lots votre bloc de données directement dans le magasin hors connexion. L'ingestion du dataframe directement dans le magasin hors ligne ne met pas à jour le magasin en ligne.

    Vous pouvez accomplir ceci en définissant target_stores=["OfflineStore"] dans la méthode .ingest_data(...).

  3. Boutique en ligne uniquement — Si la boutique en ligne est activée, le connecteur Spark ingère votre trame de données dans la boutique en ligne à l'aide du. PutRecordAPI L'ingestion du bloc de données directement dans le magasin en ligne ne met pas à jour le magasin hors connexion.

    Vous pouvez accomplir ceci en définissant target_stores=["OnlineStore"] dans la méthode .ingest_data(...).

Pour plus d'informations sur l'utilisation des différentes méthodes d'ingestion, consultez Exemples d'implémentations.

Installation de Feature Store Spark

Utilisateurs Scala

Le Feature Store Spark SDK est disponible dans le référentiel central Amazon SageMaker Feature Store Spark SDK Maven pour les utilisateurs de Scala.

Prérequis

  • Spark >= 3.0.0 et <= 3.3.0

  • iceberg-spark-runtime >= 0.14.0

  • Scala >= 2.12.x 

  • Amazon EMR >= 6.1.0 (uniquement si vous utilisez Amazon) EMR

Déclarez la dépendance au format POM .xml

Le connecteur Feature Store Spark dépend de la bibliothèque iceberg-spark-runtime. Vous devez donc ajouter la version correspondante de la bibliothèque iceberg-spark-runtime à la dépendance si vous ingérez des données dans un groupe de fonctions que vous avez créé automatiquement avec le format de table Iceberg. Par exemple, si vous utilisez Spark 3.1, vous devez déclarer ce qui suit dans le POM.xml de votre projet :

<dependency> <groupId>software.amazon.sagemaker.featurestore</groupId> <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId> <version>0.14.0</version> </dependency>

Utilisateurs Python

Le Feature Store Spark SDK est disponible dans le GitHubréférentiel open source Amazon SageMaker Feature Store Spark.

Prérequis

  • Spark >= 3.0.0 et <= 3.3.0

  • Amazon EMR >= 6.1.0 (uniquement si vous utilisez Amazon) EMR

  • Noyau = conda_python3

Nous vous recommandons de définir $SPARK_HOME sur le répertoire où Spark est installé. Pendant l'installation, Feature Store télécharge le fichier requis JAR versSPARK_HOME, de sorte que les dépendances se chargent automatiquement. Le démarrage d'un par Spark JVM est nécessaire pour que cette PySpark bibliothèque fonctionne.

Installation locale

Pour plus d'informations sur l'installation, activez le mode détaillé en ajoutant --verbose à la commande d'installation suivante.

pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

Installation sur Amazon EMR

Créez un EMR cluster Amazon avec la version 6.1.0 ou ultérieure. Activez SSH pour vous aider à résoudre les problèmes éventuels.

Vous pouvez utiliser l'une des actions suivantes pour installer la bibliothèque :

  • Créez une étape personnalisée au sein d'AmazonEMR.

  • Connectez-vous à votre cluster à l'aide de la bibliothèque SSH et installez-la à partir de là.

Note

Les informations suivantes utilisent Spark version 3.1, mais vous pouvez spécifier n'importe quelle version répondant aux exigences.

export SPARK_HOME=/usr/lib/spark sudo -E pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
Note

Si vous souhaitez installer la personne dépendante JARs automatiquement sur SPARK _HOME, n'utilisez pas l'étape bootstrap.

Installation sur une instance de SageMaker bloc-notes

Installez une version compatible avec PySpark le connecteur Spark à l'aide des commandes suivantes :

!pip3 install pyspark==3.1.1 !pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

Si vous effectuez une ingestion par lots vers le magasin hors ligne, les dépendances ne se situent pas dans l'environnement de l'instance de bloc-notes.

from pyspark.sql import SparkSession import feature_store_pyspark extra_jars = ",".join(feature_store_pyspark.classpath_jars()) spark = SparkSession.builder \ .config("spark.jars", extra_jars) \ .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \ .getOrCreate()

Installation sur des ordinateurs portables dotés de GIS

Important

Vous devez utiliser AWS Glue la version 2.0 ou ultérieure.

Utilisez les informations suivantes pour vous aider à installer le PySpark connecteur dans une session AWS Glue interactive (GIS).

Amazon SageMaker Feature Store Spark nécessite un connecteur Spark spécifique JAR lors de l'initialisation de la session à télécharger dans votre compartiment Amazon S3. Pour plus d'informations sur le téléchargement du matériel requis JAR dans votre compartiment S3, consultezRécupération du JAR Spark pour Feature Store.

Après avoir téléchargé leJAR, vous devez fournir le aux GIS sessions à l'JARaide de la commande suivante.

%extra_jars s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar

Pour installer Feature Store Spark dans l' AWS Glue environnement d'exécution, utilisez la commande %additional_python_modules magique dans le GIS bloc-notes. AWS Glue pips'exécute sur les modules que vous avez spécifiés ci-dessous%additional_python_modules.

%additional_python_modules sagemaker-feature-store-pyspark-3.1

Avant de démarrer la AWS Glue session, vous devez utiliser les deux commandes magiques précédentes.

Installation sur un AWS Glue chantier

Important

Vous devez utiliser AWS Glue la version 2.0 ou ultérieure.

Pour installer le connecteur Spark sur une AWS Glue tâche, utilisez l'--extra-jarsargument pour fournir les informations nécessaires JARs et --additional-python-modules pour installer le connecteur Spark en tant que paramètres de tâche lorsque vous créez la AWS Glue tâche, comme indiqué dans l'exemple suivant. Pour plus d'informations sur le téléchargement du matériel requis JAR dans votre compartiment S3, consultezRécupération du JAR Spark pour Feature Store.

glue_client = boto3.client('glue', region_name=region) response = glue_client.create_job( Name=pipeline_id, Description='Feature Store Compute Job', Role=glue_role_arn, ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run}, Command={ 'Name': 'glueetl', 'ScriptLocation': script_location_uri, 'PythonVersion': '3' }, DefaultArguments={ '--TempDir': temp_dir_location_uri, '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1', '--extra-jars': "s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar", ... }, MaxRetries=3, NumberOfWorkers=149, Timeout=2880, GlueVersion='3.0', WorkerType='G.2X' )

Installation sur une tâche Amazon SageMaker Processing

Pour utiliser Feature Store Spark avec des tâches Amazon SageMaker Processing, apportez votre propre image. Pour plus d'informations sur l'apport de votre image, veuillez consulter Apportez votre propre SageMaker image. Ajoutez l'étape d'installation à un Dockerfile. Après avoir transféré l'image Docker vers un ECR référentiel Amazon, vous pouvez utiliser le PySparkProcessor pour créer la tâche de traitement. Pour plus d'informations sur la création d'une tâche de traitement avec le PySpark processeur, consultezTraitement des données avec Apache Spark.

Voici un exemple d'ajout d'une étape d'installation au Dockerfile.

FROM <ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0 RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose

Récupération du JAR Spark pour Feature Store

Pour récupérer la dépendance Feature Store SparkJAR, vous devez installer le connecteur Spark à partir du référentiel Python Package Index (PyPI) pip dans n'importe quel environnement Python disposant d'un accès réseau. Un bloc-notes SageMaker Jupyter est un exemple d'environnement Python avec accès au réseau.

La commande suivante installe le connecteur Spark.

!pip install sagemaker-feature-store-pyspark-3.1

Après avoir installé Feature Store Spark, vous pouvez récupérer l'JARemplacement et le JAR télécharger sur Amazon S3.

La feature-store-pyspark-dependency-jars commande indique l'emplacement de la dépendance JAR nécessaire ajoutée par Feature Store Spark. Vous pouvez utiliser la commande pour les récupérer JAR et les télécharger sur Amazon S3.

jar_location = !feature-store-pyspark-dependency-jars jar_location = jar_location[0] s3_client = boto3.client("s3") s3_client.upload_file(jar_location, "<YOUR_BUCKET>","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")

Exemples d'implémentations

Example Python script

FeatureStoreBatchIngestion.py

from pyspark.sql import SparkSession from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager import feature_store_pyspark spark = SparkSession.builder \ .getOrCreate() # Construct test DataFrame columns = ["RecordIdentifier", "EventTime"] data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")] df = spark.createDataFrame(data).toDF(*columns) # Initialize FeatureStoreManager with a role arn if your feature group is created by another account feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") # Load the feature definitions from input schema. The feature definitions can be used to create a feature group feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df) feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" # Ingest by default. The connector will leverage PutRecord API to ingest your data in stream # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn) # To select the target stores for ingestion, you can specify the target store as the paramter # If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"]) # If only OfflineStore is selected, the connector will batch write the data to offline store directly feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"]) # To retrieve the records failed to be ingested by spark connector failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()

Soumission d'une tâche Spark avec un exemple de script Python

La PySpark version nécessite l'importation d'une personne dépendante JAR supplémentaire. Des étapes supplémentaires sont donc nécessaires pour exécuter l'application Spark.

Si vous ne l'avez pas spécifié SPARK_HOME lors de l'installation, vous devez charger le fichier requis JARs JVM lors de l'exécutionspark-submit. feature-store-pyspark-dependency-jarsest un script Python installé par la bibliothèque Spark pour récupérer automatiquement le chemin vers tout JARs pour vous.

spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py

Si vous exécutez cette application sur AmazonEMR, nous vous recommandons de l'exécuter en mode client, afin de ne pas avoir à distribuer la charge JARs à d'autres nœuds de tâches. Ajoutez une étape supplémentaire dans le EMR cluster Amazon avec un argument Spark similaire au suivant :

spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
Example Scala script

FeatureStoreBatchIngestion.scala

import software.amazon.sagemaker.featurestore.sparksdk.FeatureStoreManager import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} object TestSparkApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate() // Construct test DataFrame val data = List( Row("1", "2021-07-01T12:20:12Z"), Row("2", "2021-07-02T12:20:13Z"), Row("3", "2021-07-03T12:20:14Z") ) val schema = StructType( List(StructField("RecordIdentifier", StringType), StructField("EventTime", StringType)) ) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) // Initialize FeatureStoreManager with a role arn if your feature group is created by another account val featureStoreManager = new FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") // Load the feature definitions from input schema. The feature definitions can be used to create a feature group val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(df) val featureGroupArn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" // Ingest by default. The connector will leverage PutRecord API to ingest your data in stream // https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html featureStoreManager.ingestData(df, featureGroupArn) // To select the target stores for ingestion, you can specify the target store as the paramter // If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream featureStoreManager.ingestData(df, featureGroupArn, List("OfflineStore", "OnlineStore")) // If only OfflineStore is selected, the connector will batch write the data to offline store directly featureStoreManager.ingestData(df, featureGroupArn, ["OfflineStore"]) // To retrieve the records failed to be ingested by spark connector val failedRecordsDf = featureStoreManager.getFailedStreamIngestionDataFrame() } }

Soumission d'une tâche Spark

Scala

Vous devriez pouvoir utiliser Feature Store Spark en tant que dépendance normale. Aucune instruction supplémentaire n'est nécessaire pour exécuter l'application sur toutes les plateformes.