As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Exemplos de fontes de dados personalizadas
Esta seção fornece exemplos de implantações de fontes de dados personalizadas para Processadores de atributos. Para obter mais informações sobre fontes de dados personalizadas, consulte Fontes de dados personalizadas.
A segurança é uma responsabilidade compartilhada AWS entre nossos clientes. AWS é responsável por proteger a infraestrutura que executa os serviços no Nuvem AWS. Os clientes são responsáveis por todas as tarefas necessárias de configuração e gerenciamento de segurança. Por exemplo, segredos como credenciais de acesso aos armazenamentos de dados não devem ser codificados em suas fontes de dados personalizadas. Você pode usar AWS Secrets Manager para gerenciar essas credenciais. Para obter informações sobre o Secrets Manager, consulte O que é AWS Secrets Manager? no guia do AWS Secrets Manager usuário. Os exemplos a seguir usarão o Secrets Manager para suas credenciais.
Tópicos
Exemplos de fontes de dados personalizadas dos Clusters do Amazon Redshift (JDBC)
O Amazon Redshift oferece um driver JDBC que pode ser usado para ler dados com o Spark. Para obter informações sobre como baixar o driver JDBC do Amazon Redshift, consulte Baixar o driver JDBC do Amazon Redshift, versão 2.1.
Para criar a classe de fonte de dados personalizada do Amazon Redshift, você precisará substituir o método read_data
do Fontes de dados personalizadas.
Para se conectar a um cluster do Amazon Redshift, você precisa de:
-
URL do JDBC do Amazon Redshift (
)jdbc-url
Para obter informações sobre como obter o URL do JDBC do Amazon Redshift, consulte Obter o URL do JDBC no Guia do desenvolvedor de banco de dados do Amazon Redshift.
-
Nome de usuário (
) e senha (redshift-user
) do Amazon Redshiftredshift-password
Para obter informações sobre como criar e gerenciar usuários de banco de dados usando os comandos SQL do Amazon Redshift, consulte Usuários no Guia do desenvolvedor de banco de dados do Amazon Redshift.
-
Nome da tabela do Amazon Redshift (
)redshift-table-name
Para obter informações sobre como criar uma tabela com alguns exemplos, consulte CRIAR TABELA no Guia do desenvolvedor de banco de dados do Amazon Redshift.
-
(Opcional) Se estiver usando o Secrets Manager, você precisará do nome do segredo (
) onde você armazena seu nome de usuário e senha de acesso ao Amazon Redshift no Secrets Manager.secret-redshift-account-info
Para obter informações sobre o Secrets Manager, consulte Encontre segredos AWS Secrets Manager no Guia AWS Secrets Manager do Usuário.
-
Região da AWS (
)your-region
Para obter informações sobre como obter o nome da região da sua sessão atual usando o SDK para Python (Boto3), consulte region_name
na documentação do Boto3.
O exemplo a seguir demonstra como recuperar o URL do JDBC e o token de acesso pessoal do Secrets Manager e substituir o read_data
pela sua classe de fonte de dados personalizada, DatabricksDataSource
.
from sagemaker.feature_store.feature_processor import PySparkDataSource import json import boto3 class RedshiftDataSource(PySparkDataSource): data_source_name = "Redshift" data_source_unique_id = "
redshift-resource-arn
" def read_data(self, spark, params): url = "jdbc-url
?user=redshift-user
&password=redshift-password
" aws_iam_role_arn = "redshift-command-access-role
" secret_name = "secret-redshift-account-info
" region_name = "your-region
" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) jdbc_url = url.replace("jdbc-url
", secrets["jdbcurl"]).replace("redshift-user
", secrets['username']).replace("redshift-password
", secrets['password']) return spark.read \ .format("jdbc") \ .option("url", url) \ .option("driver", "com.amazon.redshift.Driver") \ .option("dbtable", "redshift-table-name
") \ .option("tempdir", "s3a://your-bucket-name
/your-bucket-prefix
") \ .option("aws_iam_role", aws_iam_role_arn) \ .load()
O exemplo a seguir mostra como conectar o RedshiftDataSource
ao decorador feature_processor
.
from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[RedshiftDataSource()], output="
feature-group-arn
", target_stores=["OfflineStore"], spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"} ) def transform(input_df): return input_df
Para executar o trabalho do processador de atributos remotamente, você precisa fornecer o driver jdbc definindo o SparkConfig
e passando-o para o decorador @remote
.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[RedshiftDataSource()], output="
feature-group-arn
", target_stores=["OfflineStore"], ) def transform(input_df): return input_df
Exemplos de fontes de dados personalizadas do Snowflake
O Snowflake fornece um conector Spark que pode ser usado para seu decorador feature_processor
. Para obter informações sobre o conector Snowflake para Spark, consulte Conector Snowflake para Spark
Para criar a classe de fonte de dados personalizada do Snowflake, você precisará substituir o método read_data
do Fontes de dados personalizadas e adicionar os pacotes de conectores do Spark ao classpath do Spark.
Para se conectar a uma fonte de dados do Snowflake, você precisa:
-
URL do Snowflake (
)sf-url
Para obter informações sobre URLs como acessar as interfaces web do Snowflake, consulte Identificadores de conta
na documentação do Snowflake. -
Banco de dados do Snowflake (
)sf-database
Para obter informações sobre como obter o nome do seu banco de dados usando o Snowflake, consulte CURRENT_DATABASE
na documentação do Snowflake. -
Esquema do banco de dados do Snowflake (
)sf-schema
Para obter informações sobre como obter o nome do seu esquema usando o Snowflake, consulte CURRENT_SCHEMA
na documentação do Snowflake. -
Warehouse do Snowflake (
)sf-warehouse
Para obter informações sobre como obter o nome do seu warehouse usando o Snowflake, consulte CURRENT_WAREHOUSE
na documentação do Snowflake. -
Nome da tabela do Snowflake (
)sf-table-name
-
(Opcional) Se estiver usando o Secrets Manager, você precisará do nome do segredo (
) onde você armazena seu nome de usuário e senha de acesso ao Snowflake no Secrets Manager.secret-snowflake-account-info
Para obter informações sobre o Secrets Manager, consulte Encontre segredos AWS Secrets Manager no Guia AWS Secrets Manager do Usuário.
-
Região da AWS (
)your-region
Para obter informações sobre como obter o nome da região da sua sessão atual usando o SDK para Python (Boto3), consulte region_name
na documentação do Boto3.
O exemplo a seguir demonstra como recuperar o nome de usuário e senha do Snowflake no Secrets Manager e substituir a função read_data
pela sua classe de fonte de dados personalizada SnowflakeDataSource
.
from sagemaker.feature_store.feature_processor import PySparkDataSource from sagemaker.feature_store.feature_processor import feature_processor import json import boto3 class SnowflakeDataSource(PySparkDataSource): sf_options = { "sfUrl" : "
sf-url
", "sfDatabase" : "sf-database
", "sfSchema" : "sf-schema
", "sfWarehouse" : "sf-warehouse
", } data_source_name = "Snowflake" data_source_unique_id = "sf-url
" def read_data(self, spark, params): secret_name = "secret-snowflake-account-info
" region_name = "your-region
" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) self.sf_options["sfUser"] = secrets.get("username") self.sf_options["sfPassword"] = secrets.get("password") return spark.read.format("net.snowflake.spark.snowflake") \ .options(**self.sf_options) \ .option("dbtable", "sf-table-name
") \ .load()
O exemplo a seguir mostra como conectar o SnowflakeDataSource
ao decorador feature_processor
.
from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[SnowflakeDataSource()], output=
feature-group-arn
, target_stores=["OfflineStore"], spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"} ) def transform(input_df): return input_df
Para executar o trabalho do processador de atributos remotamente, você precisa fornecer os pacotes definindo o SparkConfig
e passando-os para o decorador @remote
. Os pacotes Spark no exemplo a seguir mostra que spark-snowflake_2.12
é a versão Scala do Processador de atributos, 2.12.0
é a versão do Snowflake que você deseja usar e spark_3.3
é a versão do Spark do Processador de atributos.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[SnowflakeDataSource()], output="
feature-group-arn>
", target_stores=["OfflineStore"], ) def transform(input_df): return input_df
Exemplos de fontes de dados personalizadas do Databricks (JDBC)
O Spark pode ler dados do Databricks usando o driver JDBC do Databricks. Para obter informações sobre o driver JDBC do Databricks, consulte Configurar os drivers ODBC e JDBC do Databricks
nota
Você pode ler dados de qualquer outro banco de dados incluindo o driver JDBC correspondente no classpath do Spark. Para obter mais informações, consulte JDBC para outros bancos de dados
Para criar a classe de fonte de dados personalizada do Databricks, você precisará substituir o método read_data
do Fontes de dados personalizadas e adicionar os arquivos jar do JDBC ao classpath do Spark.
Para se conectar a uma fonte de dados do Databricks, você precisa:
-
URL do Databricks (
)databricks-url
Para obter informações sobre o URL do Databricks, consulte Criar URL da conexão para o driver do Databricks
na documentação do Databricks. -
Token de acesso pessoal do Databricks (
)personal-access-token
Para obter informações sobre seu token de acesso ao Databricks, consulte Autenticação do token de acesso pessoal do Databricks
na documentação do Databricks. -
Nome do catálogo de dados (
)db-catalog
Para obter informações sobre o nome do catálogo do Databricks, consulte Nome de catálogo
na documentação do Databricks. -
Nome do esquema (
)db-schema
Para obter informações sobre o nome do esquema do Databricks, consulte Nome do esquema
na documentação do Databricks. -
Nome da tabela (
)db-table-name
Para obter informações sobre o nome da tabela do Databricks, consulte Nome da tabela
na documentação do Databricks. -
(Opcional) Se estiver usando o Secrets Manager, você precisará do nome do segredo (
) onde você armazena seu nome de usuário e senha de acesso ao Databricks no Secrets Manager.secret-databricks-account-info
Para obter informações sobre o Secrets Manager, consulte Encontre segredos AWS Secrets Manager no Guia AWS Secrets Manager do Usuário.
-
Região da AWS (
)your-region
Para obter informações sobre como obter o nome da região da sua sessão atual usando o SDK para Python (Boto3), consulte region_name
na documentação do Boto3.
O exemplo a seguir demonstra como recuperar o URL do JDBC e o token de acesso pessoal do Secrets Manager e substituir o read_data
pela sua classe de fonte de dados personalizada, DatabricksDataSource
.
from sagemaker.feature_store.feature_processor import PySparkDataSource import json import boto3 class DatabricksDataSource(PySparkDataSource): data_source_name = "Databricks" data_source_unique_id = "
databricks-url
" def read_data(self, spark, params): secret_name = "secret-databricks-account-info
" region_name = "your-region
" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) jdbc_url = secrets["jdbcurl"].replace("personal-access-token
", secrets['pwd']) return spark.read.format("jdbc") \ .option("url", jdbc_url) \ .option("dbtable","`db-catalog
`.`db-schema
`.`db-table-name
`") \ .option("driver", "com.simba.spark.jdbc.Driver") \ .load()
O exemplo a seguir mostra como fazer o upload dos arquivos jar do driver JDBC,
, para o Amazon S3 para adicioná-lo ao classpath do Spark. Para obter informações sobre como baixar o driver JDBC (jdbc-jar-file-name
.jar
) do Spark a partir do Databricks, consulte Baixar o driver JDBCjdbc-jar-file-name
.jar
from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[DatabricksDataSource()], output=
feature-group-arn
, target_stores=["OfflineStore"], spark_config={"spark.jars": "s3://your-bucket-name
/your-bucket-prefix
/jdbc-jar-file-name
.jar"} ) def transform(input_df): return input_df
Para executar o trabalho do processador de atributos remotamente, você precisa fornecer os arquivos jar definindo o SparkConfig
e passando-os para o decorador @remote
.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars": "s3://
your-bucket-name
/your-bucket-prefix
/jdbc-jar-file-name
.jar" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[DatabricksDataSource()], output="feature-group-arn
", target_stores=["OfflineStore"], ) def transform(input_df): return input_df
Exemplos de fontes de dados personalizadas de streaming
Você pode se conectar a fontes de dados de streaming, como o Amazon Kinesis, e criar transformações com o Spark Structured Streaming para ler a partir de fontes de dados de streaming. Para obter informações sobre o conector Kinesis, consulte Conector Kinesis para streaming estruturado do Spark
Para criar a classe de fonte de dados personalizada do Amazon Kinesis, você precisará estender a classe BaseDataSource
e sobrescrever o método read_data
do Fontes de dados personalizadas.
Para se conectar a um Amazon Kinesis Data Streams, você precisa:
-
ARN do Kinesis (
)kinesis-resource-arn
Para obter informações sobre o stream de dados do Kinesis ARNs, consulte Amazon Resource Names (ARNs) para Kinesis Data Streams no Guia do desenvolvedor do Amazon Kinesis.
-
Nome do fluxo de dados do Kinesis (
)kinesis-stream-name
-
Região da AWS (
)your-region
Para obter informações sobre como obter o nome da região da sua sessão atual usando o SDK para Python (Boto3), consulte region_name
na documentação do Boto3.
from sagemaker.feature_store.feature_processor import BaseDataSource from sagemaker.feature_store.feature_processor import feature_processor class KinesisDataSource(BaseDataSource): data_source_name = "Kinesis" data_source_unique_id = "
kinesis-resource-arn
" def read_data(self, spark, params): return spark.readStream.format("kinesis") \ .option("streamName", "kinesis-stream-name
") \ .option("awsUseInstanceProfile", "false") \ .option("endpointUrl", "https://kinesis..amazonaws.com") .load()
your-region
O exemplo a seguir demonstra como conectar o KinesisDataSource
ao decorador feature_processor
.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig import feature_store_pyspark.FeatureStoreManager as fsm def ingest_micro_batch_into_fg(input_df, epoch_id): feature_group_arn = "
feature-group-arn
" fsm.FeatureStoreManager().ingest_data( input_data_frame = input_df, feature_group_arn = feature_group_arn ) @remote( spark_config=SparkConfig( configuration={ "Classification": "spark-defaults", "Properties":{ "spark.sql.streaming.schemaInference": "true", "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2" } } ), instance_type="ml.m5.2xlarge", max_runtime_in_seconds=2419200 # 28 days ) @feature_processor( inputs=[KinesisDataSource()], output="feature-group-arn
" ) def transform(input_df): output_stream = ( input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)") .writeStream.foreachBatch(ingest_micro_batch_into_fg) .trigger(processingTime="1 minute") .option("checkpointLocation", "s3a://checkpoint-path
") .start() ) output_stream.awaitTermination()
No código de exemplo acima, usamos algumas opções do Spark Structured Streaming ao transmitir microlotes para seu grupo de atributos. Para ver uma lista completa de opções, consulte o Guia de programação de streaming estruturado
-
O modo sink
foreachBatch
é um atributo que permite aplicar operações e escrever lógica nos dados de saída de cada microlote de uma consulta de streaming.Para obter informações sobre isso
foreachBatch
, consulte Usando o Foreach e ForeachBatch no Guiade programação de streaming estruturado do Apache Spark. -
A opção
checkpointLocation
salva periodicamente o estado da aplicação de streaming. O log de streaming é salvo no local
do ponto de verificação.s3a://checkpoint-path
Para obter informações sobre a opção
checkpointLocation
, consulte Recuperando-se de falhas com pontos de verificaçãono Guia de programação do Apache Spark Structured Streaming. -
A configuração
trigger
define com que frequência o processamento em microlote é acionado em uma aplicação de streaming. No exemplo, o tipo de gatilho de tempo de processamento é usado com intervalos de microlote de um minuto, especificados portrigger(processingTime="1 minute")
. Para preencher a partir de uma fonte de fluxo, você pode usar o tipo de gatilho disponível agora, especificado portrigger(availableNow=True)
.Para ver uma lista completa dos tipos de
trigger
, consulte Gatilhosno Guia de programação do Apache Spark Structured Streaming.
Streaming contínuo e novas tentativas automáticas usando gatilhos baseados em eventos
O Feature Processor usa o SageMaker treinamento como infraestrutura computacional e tem um limite máximo de tempo de execução de 28 dias. Você pode usar gatilhos baseados em eventos para estender seu streaming contínuo por um longo período de tempo e se recuperar de falhas transitórias. Para obter mais informações sobre execuções baseadas em programações e eventos, consulte Execuções programadas e baseadas em eventos para pipelines do Processador de atributos.
Veja a seguir um exemplo de configuração de um gatilho baseado em eventos para manter o pipeline de streaming do Processador de atributos funcionando continuamente. Ele usa a função de transformação de streaming definida no exemplo anterior. Um pipeline de destino pode ser configurado para ser acionado quando ocorre um evento STOPPED
ou FAILED
para a execução de um pipeline de origem. Observe que o mesmo pipeline é usado como origem e destino para que seja executado continuamente.
import sagemaker.feature_store.feature_processor as fp from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus streaming_pipeline_name = "
streaming-pipeline
" streaming_pipeline_arn = fp.to_pipeline( pipeline_name = streaming_pipeline_name, step = transform # defined in previous section ) fp.put_trigger( source_pipeline_events=FeatureProcessorPipelineEvents( pipeline_name=source_pipeline_name, pipeline_execution_status=[ FeatureProcessorPipelineExecutionStatus.STOPPED, FeatureProcessorPipelineExecutionStatus.FAILED] ), target_pipeline=target_pipeline_name )