Ingestión por lotes con Amazon SageMaker Feature Store Spark - Amazon SageMaker AI

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Ingestión por lotes con Amazon SageMaker Feature Store Spark

Amazon SageMaker Feature Store Spark es un conector de Spark que conecta la biblioteca de Spark con Feature Store. Spark con el almacén de características simplifica la ingesta de datos de los DataFrame de Spark a los grupos de características. Feature Store admite la ingesta de datos por lotes con Spark, utilizando tu canalización de ETL existente, en Amazon EMR, GIS, un trabajo, AWS Glue un trabajo de procesamiento de SageMaker Amazon o SageMaker un cuaderno.

Se proporcionan métodos para instalar e implementar la ingesta de datos por lotes para los desarrolladores de Python y Scala. Los desarrolladores de Python pueden usar la biblioteca sagemaker-feature-store-pyspark Python de código abierto para el desarrollo local, la instalación en Amazon EMR y para los Jupyter Notebooks siguiendo las instrucciones del repositorio Spark de Amazon SageMaker Feature Store. GitHub Los desarrolladores de Scala pueden usar el conector Spark de Feature Store, disponible en el repositorio central de Maven del SDK de Spark de Amazon SageMaker Feature Store.

Puede utilizar el conector Spark para ingerir datos de las siguientes maneras, en función de si están habilitados el almacenamiento en línea, el almacenamiento sin conexión o ambos.

  1. Introducir de forma predeterminada: si la tienda online está habilitada, el conector Spark primero introduce el marco de datos en la tienda online mediante la API. PutRecord Solo queda en el almacenamiento en línea el registro con la hora del evento más grande. Si está habilitado el almacenamiento sin conexión, el almacén de características incorpora su marco de datos al almacenamiento sin conexión dentro de 15 minutos. Para obtener más información sobre cómo funcionan el almacenamiento sin conexión y el almacenamiento en línea, consulte Conceptos del almacén de características.

    Puede lograr esto si no especifica target_stores en el método .ingest_data(...).

  2. Ingesta directa al almacenamiento sin conexión: si está habilitado el almacenamiento sin conexión, el conector Spark incorpora por lotes el marco de datos directamente al almacenamiento sin conexión. La ingesta directa del marco de datos al almacenamiento sin conexión no actualiza el almacenamiento en línea.

    Puede lograr esto si especifica target_stores=["OfflineStore"] en el método .ingest_data(...).

  3. Solo en la tienda en línea: si la tienda en línea está habilitada, el conector Spark introduce tu marco de datos en la tienda en línea mediante la API. PutRecord La ingesta directa del marco de datos al almacenamiento en línea no actualiza el almacenamiento sin conexión.

    Puede lograr esto si especifica target_stores=["OnlineStore"] en el método .ingest_data(...).

Para obtener información acerca de los distintos métodos de ingestión, consulte Implementaciones de ejemplo.

Instalación de Spark en el almacén de características

Usuarios de Scala

El SDK de Spark de Feature Store está disponible en el repositorio central de Maven del SDK de Spark de Amazon SageMaker Feature Store para los usuarios de Scala.

Requisitos

  • Spark >= 3.0.0 y <= 3.3.0

  • iceberg-spark-runtime >= 0.14.0

  • Scala >= 2.12.x 

  • Amazon EMR >= 6.1.0 (solo si usa Amazon EMR)

Declare la dependencia en POM.xml

El conector Spark del almacén de características depende de la biblioteca iceberg-spark-runtime. Por lo tanto, debe agregar la versión correspondiente de la biblioteca iceberg-spark-runtime a la dependencia si va a ingerir datos en un grupo de características que ha creado automáticamente con el formato de tabla de Iceberg. Por ejemplo, si utiliza Spark 3.1, debe declarar lo siguiente en el POM.xml del proyecto:

<dependency> <groupId>software.amazon.sagemaker.featurestore</groupId> <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId> <version>0.14.0</version> </dependency>

Usuarios de Python

El SDK de Spark de Feature Store está disponible en el GitHubrepositorio de código abierto de Amazon SageMaker Feature Store Spark.

Requisitos

  • Spark >= 3.0.0 y <= 3.3.0

  • Amazon EMR >= 6.1.0 (solo si usa Amazon EMR)

  • Kernel = conda_python3

Se recomienda configurar $SPARK_HOME en el directorio en el que tenga instalado Spark. Durante la instalación, el almacén de características carga el JAR necesario para SPARK_HOME, de modo que las dependencias se carguen automáticamente. Para que esta PySpark biblioteca funcione, es necesario que Spark inicie una JVM.

Instalación local

Para obtener más información sobre la instalación, habilite el modo detallado anexando --verbose al siguiente comando de instalación.

pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

Instalación en Amazon EMR

Cree un clúster de Amazon EMR con la versión de lanzamiento 6.1.0 o posterior. Habilite SSH para ayudarle a solucionar cualquier problema.

Puede hacer una de estas cosas para instalar la biblioteca:

  • Crear un paso personalizado en Amazon EMR.

  • Conectarse a su clúster mediante SSH e instalar la biblioteca desde allí.

nota

La siguiente información usa la versión 3.1 de Spark, pero puede especificar cualquier versión que cumpla los requisitos.

export SPARK_HOME=/usr/lib/spark sudo -E pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
nota

Si quiere instalar el dependiente JARs automáticamente en SPARK_HOME, no utilice el paso de arranque.

Instalación en una instancia de portátil SageMaker

Instala una versión PySpark que sea compatible con el conector Spark mediante los siguientes comandos:

!pip3 install pyspark==3.1.1 !pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

Si va a realizar una ingestión por lotes en el almacenamiento sin conexión, las dependencias no se encuentran en el entorno de instancias del cuaderno.

from pyspark.sql import SparkSession import feature_store_pyspark extra_jars = ",".join(feature_store_pyspark.classpath_jars()) spark = SparkSession.builder \ .config("spark.jars", extra_jars) \ .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \ .getOrCreate()

Instalación en cuadernos con GIS

importante

Debes usar la AWS Glue versión 2.0 o posterior.

Utilice la siguiente información como ayuda para instalar el PySpark conector en una sesión AWS Glue interactiva (SIG).

Amazon SageMaker Feature Store Spark requiere un JAR de conector Spark específico durante la inicialización de la sesión para cargarlo en su bucket de Amazon S3. Para obtener más información sobre cómo cargar el JAR requerido en su bucket de S3, consulte Recuperación del JAR para el Spark del almacén de características.

Una vez cargado el JAR, debe proporcionar el JAR a las sesiones de GIS mediante el siguiente comando.

%extra_jars s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar

Para instalar Feature Store Spark en AWS Glue tiempo de ejecución, utilice el comando %additional_python_modules mágico del bloc de notas GIS. AWS Glue se ejecuta pip en los módulos que especificó en%additional_python_modules.

%additional_python_modules sagemaker-feature-store-pyspark-3.1

Antes de iniciar la AWS Glue sesión, debe utilizar los dos comandos mágicos anteriores.

Instalación en un AWS Glue trabajo

importante

Debe utilizar la AWS Glue versión 2.0 o posterior.

Para instalar el conector Spark en un AWS Glue trabajo, utilice el --extra-jars argumento para proporcionar lo necesario JARs e --additional-python-modules instalar el Spark Connector como parámetros del trabajo al crear el AWS Glue trabajo, como se muestra en el siguiente ejemplo. Para obtener más información sobre cómo cargar el JAR requerido en su bucket de S3, consulte Recuperación del JAR para el Spark del almacén de características.

glue_client = boto3.client('glue', region_name=region) response = glue_client.create_job( Name=pipeline_id, Description='Feature Store Compute Job', Role=glue_role_arn, ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run}, Command={ 'Name': 'glueetl', 'ScriptLocation': script_location_uri, 'PythonVersion': '3' }, DefaultArguments={ '--TempDir': temp_dir_location_uri, '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1', '--extra-jars': "s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar", ... }, MaxRetries=3, NumberOfWorkers=149, Timeout=2880, GlueVersion='3.0', WorkerType='G.2X' )

Instalación en un trabajo de Amazon SageMaker Processing

Para usar Feature Store Spark con los trabajos SageMaker de Amazon Processing, trae tu propia imagen. Para obtener más información acerca de cómo traer su propia imagen, consulte Traiga su propia imagen de SageMaker IA. Agregue el paso de instalación a un Dockerfile. Tras insertar la imagen de Docker en un repositorio de Amazon ECR, puede utilizarla PySparkProcessor para crear el trabajo de procesamiento. Para obtener más información sobre la creación de un trabajo de procesamiento con el PySpark procesador, consulte. Ejecución de un trabajo de procesamiento con Apache Spark

A continuación, se muestra un ejemplo de cómo agregar un paso de instalación al Dockerfile.

FROM <ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0 RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose

Recuperación del JAR para el Spark del almacén de características

Para recuperar el JAR de dependencias de Spark del almacén de características, debe instalar el conector Spark desde el repositorio Índice de paquetes de Python (PyPI) mediante pip en cualquier entorno Python con acceso a la red. Un SageMaker Jupyter Notebook es un ejemplo de un entorno Python con acceso a la red.

El siguiente comando instala el conector Spark.

!pip install sagemaker-feature-store-pyspark-3.1

Tras instalar el Spark del almacén de características, puede recuperar la ubicación del JAR y cargar el JAR a Amazon S3.

El comando feature-store-pyspark-dependency-jars proporciona la ubicación del JAR de dependencia necesario que agregó el Spark del almacén de características. Puede utilizar el comando para recuperar el JAR y cargarlo en Amazon S3.

jar_location = !feature-store-pyspark-dependency-jars jar_location = jar_location[0] s3_client = boto3.client("s3") s3_client.upload_file(jar_location, "<YOUR_BUCKET>","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")

Implementaciones de ejemplo

Example Python script

FeatureStoreBatchIngestion.py

from pyspark.sql import SparkSession from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager import feature_store_pyspark spark = SparkSession.builder \ .getOrCreate() # Construct test DataFrame columns = ["RecordIdentifier", "EventTime"] data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")] df = spark.createDataFrame(data).toDF(*columns) # Initialize FeatureStoreManager with a role arn if your feature group is created by another account feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") # Load the feature definitions from input schema. The feature definitions can be used to create a feature group feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df) feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" # Ingest by default. The connector will leverage PutRecord API to ingest your data in stream # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn) # To select the target stores for ingestion, you can specify the target store as the paramter # If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"]) # If only OfflineStore is selected, the connector will batch write the data to offline store directly feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"]) # To retrieve the records failed to be ingested by spark connector failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()

Enviar un trabajo de Spark con ejemplo de script de Python

La PySpark versión requiere la importación de un JAR dependiente adicional, por lo que se necesitan pasos adicionales para ejecutar la aplicación Spark.

Si no lo especificaste SPARK_HOME durante la instalación, tendrás que cargar lo necesario JARs en la JVM durante la ejecuciónspark-submit. feature-store-pyspark-dependency-jarses un script de Python instalado por la biblioteca Spark para buscar automáticamente la ruta a todo JARs por ti.

spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py

Si ejecuta esta aplicación en Amazon EMR, le recomendamos que ejecute la aplicación en modo cliente, de modo que no necesite distribuir lo dependiente JARs a otros nodos de tareas. Agregue un paso más en el clúster de Amazon EMR con un argumento de Spark similar al siguiente:

spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
Example Scala script

FeatureStoreBatchIngestion.scala

import software.amazon.sagemaker.featurestore.sparksdk.FeatureStoreManager import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} object TestSparkApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate() // Construct test DataFrame val data = List( Row("1", "2021-07-01T12:20:12Z"), Row("2", "2021-07-02T12:20:13Z"), Row("3", "2021-07-03T12:20:14Z") ) val schema = StructType( List(StructField("RecordIdentifier", StringType), StructField("EventTime", StringType)) ) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) // Initialize FeatureStoreManager with a role arn if your feature group is created by another account val featureStoreManager = new FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") // Load the feature definitions from input schema. The feature definitions can be used to create a feature group val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(df) val featureGroupArn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" // Ingest by default. The connector will leverage PutRecord API to ingest your data in stream // https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html featureStoreManager.ingestData(df, featureGroupArn) // To select the target stores for ingestion, you can specify the target store as the paramter // If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream featureStoreManager.ingestData(df, featureGroupArn, List("OfflineStore", "OnlineStore")) // If only OfflineStore is selected, the connector will batch write the data to offline store directly featureStoreManager.ingestData(df, featureGroupArn, ["OfflineStore"]) // To retrieve the records failed to be ingested by spark connector val failedRecordsDf = featureStoreManager.getFailedStreamIngestionDataFrame() } }

Enviar un trabajo de Spark

Scala

Debería poder usar el Spark del almacén de características como una dependencia normal. No se necesitan instrucciones adicionales para ejecutar la aplicación en todas las plataformas.