Ingestión por lotes con Amazon SageMaker Feature Store Spark - Amazon SageMaker

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Ingestión por lotes con Amazon SageMaker Feature Store Spark

Amazon SageMaker Feature Store Spark es un conector de Spark que conecta la biblioteca de Spark con Feature Store. Spark con el almacén de características simplifica la ingesta de datos de los DataFrame de Spark a los grupos de características. Feature Store admite la ingesta de datos por lotes con Spark, utilizando tu ETL canalización existente, en Amazon EMRGIS, un AWS Glue trabajo, un trabajo de SageMaker procesamiento de Amazon o un SageMaker cuaderno.

Se proporcionan métodos para instalar e implementar la ingesta de datos por lotes para los desarrolladores de Python y Scala. Los desarrolladores de Python pueden usar la biblioteca sagemaker-feature-store-pyspark Python de código abierto para el desarrollo local, la instalación en Amazon EMR y para los Jupyter Notebooks siguiendo las instrucciones del repositorio Spark de Amazon SageMaker Feature Store. GitHub Los desarrolladores de Scala pueden usar el conector Spark de Feature Store, disponible en el repositorio central de Spark SDK Maven de Amazon SageMaker Feature Store.

Puede utilizar el conector Spark para ingerir datos de las siguientes maneras, en función de si están habilitados el almacenamiento en línea, el almacenamiento sin conexión o ambos.

  1. Ingestar de forma predeterminada: si la tienda en línea está habilitada, el conector Spark primero ingiere su marco de datos en la tienda en línea mediante el. PutRecordAPI Solo queda en el almacenamiento en línea el registro con la hora del evento más grande. Si está habilitado el almacenamiento sin conexión, el almacén de características incorpora su marco de datos al almacenamiento sin conexión dentro de 15 minutos. Para obtener más información sobre cómo funcionan el almacenamiento sin conexión y el almacenamiento en línea, consulte Conceptos del almacén de características.

    Puede lograr esto si no especifica target_stores en el método .ingest_data(...).

  2. Ingesta directa al almacenamiento sin conexión: si está habilitado el almacenamiento sin conexión, el conector Spark incorpora por lotes el marco de datos directamente al almacenamiento sin conexión. La ingesta directa del marco de datos al almacenamiento sin conexión no actualiza el almacenamiento en línea.

    Puede lograr esto si especifica target_stores=["OfflineStore"] en el método .ingest_data(...).

  3. Solo en la tienda en línea: si la tienda en línea está habilitada, el conector Spark ingiere su marco de datos en la tienda en línea mediante el. PutRecordAPI La ingesta directa del marco de datos al almacenamiento en línea no actualiza el almacenamiento sin conexión.

    Puede lograr esto si especifica target_stores=["OnlineStore"] en el método .ingest_data(...).

Para obtener información acerca de los distintos métodos de ingestión, consulte Implementaciones de ejemplo.

Instalación de Spark en el almacén de características

Usuarios de Scala

El Feature Store Spark SDK está disponible en el repositorio central de SDK Maven de Amazon SageMaker Feature Store para los usuarios de Scala.

Requisitos

  • Spark >= 3.0.0 y <= 3.3.0

  • iceberg-spark-runtime >= 0.14.0

  • Scala >= 2.12.x 

  • Amazon EMR >= 6.1.0 (solo si utilizas Amazon) EMR

Declare la dependencia en .xml POM

El conector Spark del almacén de características depende de la biblioteca iceberg-spark-runtime. Por lo tanto, debe agregar la versión correspondiente de la biblioteca iceberg-spark-runtime a la dependencia si va a ingerir datos en un grupo de características que ha creado automáticamente con el formato de tabla de Iceberg. Por ejemplo, si utiliza Spark 3.1, debe declarar lo siguiente en el POM.xml del proyecto:

<dependency> <groupId>software.amazon.sagemaker.featurestore</groupId> <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId> <version>0.14.0</version> </dependency>

Usuarios de Python

El Feature Store Spark SDK está disponible en el GitHubrepositorio de código abierto de Amazon SageMaker Feature Store Spark.

Requisitos

  • Spark >= 3.0.0 y <= 3.3.0

  • Amazon EMR >= 6.1.0 (solo si utilizas Amazon) EMR

  • Kernel = conda_python3

Se recomienda configurar $SPARK_HOME en el directorio en el que tenga instalado Spark. Durante la instalación, Feature Store carga lo necesario JAR para SPARK_HOME que las dependencias se carguen automáticamente. Para que esta PySpark biblioteca funcione, JVM es necesario que Spark inicie un.

Instalación local

Para obtener más información sobre la instalación, habilite el modo detallado anexando --verbose al siguiente comando de instalación.

pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

Instalación en Amazon EMR

Crea un EMR clúster de Amazon con la versión de lanzamiento 6.1.0 o posterior. SSHActívala para ayudarte a solucionar cualquier problema.

Puede hacer una de estas cosas para instalar la biblioteca:

  • Crea un paso personalizado en AmazonEMR.

  • Conéctese a su clúster mediante la biblioteca SSH e instálela desde allí.

nota

La siguiente información usa la versión 3.1 de Spark, pero puede especificar cualquier versión que cumpla los requisitos.

export SPARK_HOME=/usr/lib/spark sudo -E pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
nota

Si desea instalar el dependiente JARs automáticamente en SPARK _HOME, no utilice el paso de arranque.

Instalación en una instancia de SageMaker portátil

Instala una versión PySpark que sea compatible con el conector Spark mediante los siguientes comandos:

!pip3 install pyspark==3.1.1 !pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

Si va a realizar una ingestión por lotes en el almacenamiento sin conexión, las dependencias no se encuentran en el entorno de instancias del cuaderno.

from pyspark.sql import SparkSession import feature_store_pyspark extra_jars = ",".join(feature_store_pyspark.classpath_jars()) spark = SparkSession.builder \ .config("spark.jars", extra_jars) \ .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \ .getOrCreate()

Instalación en ordenadores portátiles con GIS

importante

Debe utilizar la AWS Glue versión 2.0 o posterior.

Utilice la siguiente información como ayuda para instalar el PySpark conector en una sesión AWS Glue interactiva (GIS).

Amazon SageMaker Feature Store Spark requiere un conector Spark específico JAR durante la inicialización de la sesión para cargarlo en tu bucket de Amazon S3. Para obtener más información sobre cómo cargar lo necesario JAR en su bucket de S3, consulte. Recuperando el Spark JAR de Feature Store

Una vez que haya subido elJAR, debe proporcionar a las GIS sesiones el JAR comando siguiente.

%extra_jars s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar

Para instalar Feature Store Spark en AWS Glue tiempo de ejecución, usa el comando %additional_python_modules mágico del GIS bloc de notas. AWS Glue se ejecuta pip en los módulos que especificó en%additional_python_modules.

%additional_python_modules sagemaker-feature-store-pyspark-3.1

Antes de iniciar la AWS Glue sesión, debe utilizar los dos comandos mágicos anteriores.

Instalación en un AWS Glue trabajo

importante

Debe utilizar la AWS Glue versión 2.0 o posterior.

Para instalar el conector Spark en un AWS Glue trabajo, utilice el --extra-jars argumento para proporcionar lo necesario JARs e --additional-python-modules instalar el Spark Connector como parámetros del trabajo al crear el AWS Glue trabajo, como se muestra en el siguiente ejemplo. Para obtener más información sobre cómo cargar lo JAR necesario en su bucket de S3, consulteRecuperando el Spark JAR de Feature Store.

glue_client = boto3.client('glue', region_name=region) response = glue_client.create_job( Name=pipeline_id, Description='Feature Store Compute Job', Role=glue_role_arn, ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run}, Command={ 'Name': 'glueetl', 'ScriptLocation': script_location_uri, 'PythonVersion': '3' }, DefaultArguments={ '--TempDir': temp_dir_location_uri, '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1', '--extra-jars': "s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar", ... }, MaxRetries=3, NumberOfWorkers=149, Timeout=2880, GlueVersion='3.0', WorkerType='G.2X' )

Instalación en un trabajo de Amazon SageMaker Processing

Para usar Feature Store Spark con los trabajos SageMaker de Amazon Processing, trae tu propia imagen. Para obtener más información acerca de cómo traer su propia imagen, consulte Traiga su propia SageMaker imagen. Agregue el paso de instalación a un Dockerfile. Una vez que hayas subido la imagen de Docker a un ECR repositorio de Amazon, puedes utilizarla PySparkProcessor para crear el trabajo de procesamiento. Para obtener más información sobre cómo crear un trabajo de procesamiento con el PySpark procesador, consulteEjecute un trabajo de procesamiento con Apache Spark.

A continuación, se muestra un ejemplo de cómo agregar un paso de instalación al Dockerfile.

FROM <ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0 RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose

Recuperando el Spark JAR de Feature Store

Para recuperar la dependencia de Feature Store SparkJAR, debe instalar el conector de Spark desde el repositorio Python Package Index (PyPI) pip en cualquier entorno Python con acceso a la red. Un SageMaker Jupyter Notebook es un ejemplo de un entorno Python con acceso a la red.

El siguiente comando instala el conector Spark.

!pip install sagemaker-feature-store-pyspark-3.1

Después de instalar Feature Store Spark, puede recuperar la JAR ubicación y subirla JAR a Amazon S3.

El feature-store-pyspark-dependency-jars comando proporciona la ubicación de la dependencia necesaria JAR que Feature Store Spark agregó. Puede usar el comando para recuperarlo JAR y subirlo a Amazon S3.

jar_location = !feature-store-pyspark-dependency-jars jar_location = jar_location[0] s3_client = boto3.client("s3") s3_client.upload_file(jar_location, "<YOUR_BUCKET>","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")

Implementaciones de ejemplo

Example Python script

FeatureStoreBatchIngestion.py

from pyspark.sql import SparkSession from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager import feature_store_pyspark spark = SparkSession.builder \ .getOrCreate() # Construct test DataFrame columns = ["RecordIdentifier", "EventTime"] data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")] df = spark.createDataFrame(data).toDF(*columns) # Initialize FeatureStoreManager with a role arn if your feature group is created by another account feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") # Load the feature definitions from input schema. The feature definitions can be used to create a feature group feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df) feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" # Ingest by default. The connector will leverage PutRecord API to ingest your data in stream # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn) # To select the target stores for ingestion, you can specify the target store as the paramter # If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"]) # If only OfflineStore is selected, the connector will batch write the data to offline store directly feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"]) # To retrieve the records failed to be ingested by spark connector failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()

Enviar un trabajo de Spark con ejemplo de script de Python

La PySpark versión requiere la importación de un dependiente JAR adicional, por lo que se necesitan pasos adicionales para ejecutar la aplicación Spark.

Si no lo especificaste SPARK_HOME durante la instalación, tendrás que cargar el archivo requerido JARs durante la ejecuciónspark-submit. JVM feature-store-pyspark-dependency-jarses un script de Python instalado por la biblioteca Spark para buscar automáticamente la ruta a todo JARs por ti.

spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py

Si ejecuta esta aplicación en AmazonEMR, le recomendamos que la ejecute en modo cliente, de modo que no necesite distribuir lo dependiente JARs a otros nodos de tareas. Agrega un paso más en el EMR clúster de Amazon con un argumento de Spark similar al siguiente:

spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
Example Scala script

FeatureStoreBatchIngestion.scala

import software.amazon.sagemaker.featurestore.sparksdk.FeatureStoreManager import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} object TestSparkApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate() // Construct test DataFrame val data = List( Row("1", "2021-07-01T12:20:12Z"), Row("2", "2021-07-02T12:20:13Z"), Row("3", "2021-07-03T12:20:14Z") ) val schema = StructType( List(StructField("RecordIdentifier", StringType), StructField("EventTime", StringType)) ) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) // Initialize FeatureStoreManager with a role arn if your feature group is created by another account val featureStoreManager = new FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") // Load the feature definitions from input schema. The feature definitions can be used to create a feature group val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(df) val featureGroupArn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" // Ingest by default. The connector will leverage PutRecord API to ingest your data in stream // https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html featureStoreManager.ingestData(df, featureGroupArn) // To select the target stores for ingestion, you can specify the target store as the paramter // If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream featureStoreManager.ingestData(df, featureGroupArn, List("OfflineStore", "OnlineStore")) // If only OfflineStore is selected, the connector will batch write the data to offline store directly featureStoreManager.ingestData(df, featureGroupArn, ["OfflineStore"]) // To retrieve the records failed to be ingested by spark connector val failedRecordsDf = featureStoreManager.getFailedStreamIngestionDataFrame() } }

Enviar un trabajo de Spark

Scala

Debería poder usar el Spark del almacén de características como una dependencia normal. No se necesitan instrucciones adicionales para ejecutar la aplicación en todas las plataformas.