기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
이 섹션에서는 특성 프로세서의 사용자 지정 데이터 소스 구현의 예를 제공합니다. 사용자 지정 데이터 소스에 대한 자세한 내용은 사용자 지정 데이터 소스섹션을 참조하세요.
보안은 AWS 와 고객 간의 공동 책임입니다. AWS 는에서 서비스를 실행하는 인프라를 보호할 책임이 있습니다 AWS 클라우드. 고객은 필요한 모든 보안 구성과 관리 작업에 책임이 있습니다. 예를 들어 데이터 저장소에 대한 액세스 자격 증명과 같은 보안 정보를 사용자 지정 데이터 소스에 하드 코딩해서는 안 됩니다. AWS Secrets Manager 를 사용하여 이러한 자격 증명을 관리할 수 있습니다. Secrets Manager에 대한 자세한 내용은 AWS Secrets Manager 사용 설명서의 란 무엇입니까 AWS Secrets Manager?를 참조하세요. 다음 예시에서는 Secrets Manager를 자격 증명으로 사용합니다.
주제
Amazon Redshift 클러스터(JDBC) 사용자 지정 데이터 소스 예제
Amazon Redshift는 Spark로 데이터를 읽는 데 사용할 수 있는 JDBC 드라이버를 제공합니다. Amazon Redshift JDBC 드라이버를 다운로드하는 방법에 대한 자세한 내용은 Amazon Redshift JDBC 드라이버, 버전 2.1 다운로드를 참조하세요.
사용자 지정 Amazon Redshift 데이터 소스 클래스를 생성하려면 사용자 지정 데이터 소스에서 read_data
메서드를 덮어써야 합니다.
Amazon Redshift 클러스터에 연결하려면 다음이 필요합니다.
-
Amazon Redshift JDBC URL (
)jdbc-url
Amazon Redshift JDBC URL을 얻는 방법에 대한 자세한 내용은 Amazon Redshift 데이터베이스 개발자 안내서의 JDBC URL 가져오기를 참조하세요.
-
Amazon Redshift 사용자 이름(
) 및 암호(redshift-user
)redshift-password
Amazon Redshift SQL 명령을 사용하여 데이터베이스 사용자를 생성하고 관리하는 방법에 대한 자세한 내용은 Amazon Redshift 데이터베이스 개발자 안내서의 사용자를 참조하세요.
-
Amazon Redshift 테이블 이름(
)redshift-table-name
테이블을 생성하는 방법과 몇 가지 예에 대한 자세한 내용은 Amazon Redshift 데이터베이스 개발자 안내서의 테이블 생성 섹션을 참조하세요.
-
(선택 사항) Secrets Manager를 사용하는 경우, Secrets Manager에 Amazon Redshift 액세스 사용자 이름과 암호를 저장하는 보안 이름(
) 이 필요합니다.secret-redshift-account-info
Secrets Manager에 대한 자세한 내용은 AWS Secrets Manager 사용 설명서의 에서 보안 암호 찾기 AWS Secrets Manager를 참조하세요.
-
AWS 리전 (
)your-region
Python용 SDK (Boto3를 사용하여 현재 세션의 리전 이름을 얻는 방법에 대한 자세한 내용은 Boto3 설명서의 region_name
을 참조하세요.
다음 예제는 Secrets Manager에서 JDBC URL 및 개인용 액세스 토큰을 검색하고 사용자 지정 데이터 소스 클래스 DatabricksDataSource
에 대해 read_data
를 재정의하는 방법을 보여줍니다.
from sagemaker.feature_store.feature_processor import PySparkDataSource import json import boto3 class RedshiftDataSource(PySparkDataSource): data_source_name = "Redshift" data_source_unique_id = "
redshift-resource-arn
" def read_data(self, spark, params): url = "jdbc-url
?user=redshift-user
&password=redshift-password
" aws_iam_role_arn = "redshift-command-access-role
" secret_name = "secret-redshift-account-info
" region_name = "your-region
" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) jdbc_url = url.replace("jdbc-url
", secrets["jdbcurl"]).replace("redshift-user
", secrets['username']).replace("redshift-password
", secrets['password']) return spark.read \ .format("jdbc") \ .option("url", url) \ .option("driver", "com.amazon.redshift.Driver") \ .option("dbtable", "redshift-table-name
") \ .option("tempdir", "s3a://your-bucket-name
/your-bucket-prefix
") \ .option("aws_iam_role", aws_iam_role_arn) \ .load()
다음 예에서는 feature_processor
데코레이터에 RedshiftDataSource
를 연결하는 방법을 보여줍니다.
from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[RedshiftDataSource()], output="
feature-group-arn
", target_stores=["OfflineStore"], spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"} ) def transform(input_df): return input_df
특성 프로세서 작업을 원격으로 실행하려면 SparkConfig
를 정의하여 jdbc 드라이버를 제공하고 이를 @remote
데코레이터에 전달해야 합니다.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[RedshiftDataSource()], output="
feature-group-arn
", target_stores=["OfflineStore"], ) def transform(input_df): return input_df
Snowflake 사용자 지정 데이터 소스 예제
Snowflake는 feature_processor
데코레이터로 사용할 수 있는 Spark 커넥터를 제공합니다. Spark용 Snowflake 커넥터에 대한 자세한 내용은 Snowflake 설명서의 Spark용 Snowflake 커넥터
사용자 지정 Snowflake 데이터 소스 클래스를 만들려면 사용자 지정 데이터 소스의 read_data
메서드를 재정의하고 Spark 커넥터 패키지를 Spark 클래스 경로에 추가해야 합니다.
Snowflake 데이터 소스에 연결하려면 다음이 필요합니다.
-
Snowflake URL (
)sf-url
Snowflake 웹 인터페이스에 액세스하기 위한 URL에 대한 자세한 내용은 Snowflake 설명서의 계정 식별자
를 참조하세요. -
Snowflake 데이터베이스 (
)sf-database
Snowflake를 사용하여 데이터베이스 이름을 얻는 방법에 대한 자세한 내용은 Snowflake 설명서의 CURRENT_DATABASE
를 참조하세요. -
Snowflake 데이터베이스 스키마 (
)sf-schema
Snowflake를 사용하여 스키마의 이름을 얻는 방법에 대한 자세한 내용은 Snowflake 설명서의 CURRENT_SCHEMA
를 참조하세요. -
Snowflake 웨어하우스 (
)sf-warehouse
Snowflake를 사용하여 웨어하우스 이름을 얻는 방법에 대한 자세한 내용은 Snowflake 설명서의 CURRENT_WAREHOUSE
를 참조하세요. -
Snowflake 테이블 이름 (
)sf-table-name
-
(선택 사항) Secrets Manager를 사용하는 경우, Secrets Manager에 Snowflake 액세스 사용자 이름과 암호를 저장하는 보안 이름(
)이 필요합니다.secret-snowflake-account-info
Secrets Manager에 대한 자세한 내용은 AWS Secrets Manager 사용 설명서의 에서 보안 암호 찾기 AWS Secrets Manager를 참조하세요.
-
AWS 리전 (
)your-region
Python용 SDK (Boto3를 사용하여 현재 세션의 리전 이름을 얻는 방법에 대한 자세한 내용은 Boto3 설명서의 region_name
을 참조하세요.
다음 예제는 Secrets Manager에서 Snowflake 사용자 이름과 암호를 검색하고 사용자 지정 데이터 소스 클래스 SnowflakeDataSource
의 read_data
함수를 재정의하는 방법을 보여줍니다.
from sagemaker.feature_store.feature_processor import PySparkDataSource from sagemaker.feature_store.feature_processor import feature_processor import json import boto3 class SnowflakeDataSource(PySparkDataSource): sf_options = { "sfUrl" : "
sf-url
", "sfDatabase" : "sf-database
", "sfSchema" : "sf-schema
", "sfWarehouse" : "sf-warehouse
", } data_source_name = "Snowflake" data_source_unique_id = "sf-url
" def read_data(self, spark, params): secret_name = "secret-snowflake-account-info
" region_name = "your-region
" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) self.sf_options["sfUser"] = secrets.get("username") self.sf_options["sfPassword"] = secrets.get("password") return spark.read.format("net.snowflake.spark.snowflake") \ .options(**self.sf_options) \ .option("dbtable", "sf-table-name
") \ .load()
다음 예에서는 feature_processor
데코레이터에 SnowflakeDataSource
를 연결하는 방법을 보여줍니다.
from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[SnowflakeDataSource()], output=
feature-group-arn
, target_stores=["OfflineStore"], spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"} ) def transform(input_df): return input_df
특성 프로세서 작업을 원격으로 실행하려면 SparkConfig
정의를 통해 패키지를 제공하고 @remote
데코레이터에 전달해야 합니다. 다음 예제의 Spark 패키지에서 spark-snowflake_2.12
는 특성 프로세서 Scala 버전, 2.12.0
은 사용하려는 Snowflake 버전, spark_3.3
은 특성 프로세서 Spark 버전입니다.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[SnowflakeDataSource()], output="
feature-group-arn>
", target_stores=["OfflineStore"], ) def transform(input_df): return input_df
Databricks (JDBC) 사용자 지정 데이터 소스 예제
Spark는 Databricks JDBC 드라이버를 사용하여 Databricks에서 데이터를 읽을 수 있습니다. Databricks JDBC 드라이버에 대한 자세한 내용은 Databricks 설명서의 Databricks ODBC 및 JDBC 드라이버 구성
참고
Spark 클래스 경로에 해당 JDBC 드라이버를 포함하여 다른 데이터베이스에서 데이터를 읽을 수 있습니다. 자세한 내용은 Spark SQL 설명서의 JDBC에서 다른 데이터베이스
사용자 지정 Databricks 데이터 소스 클래스를 만들려면 사용자 지정 데이터 소스에서 read_data
메서드를 재정의하고 Spark 클래스 경로에 JDBC jar를 추가해야 합니다.
Databricks 데이터 소스에 연결하려면 다음이 필요합니다.
-
Databricks URL (
)databricks-url
Databricks URL에 대한 자세한 내용은 Databricks 설명서의 Databricks 드라이버용 연결 URL 빌드
를 참조하세요. -
Databricks 개인용 액세스 토큰 (
)personal-access-token
Databricks 액세스 토큰에 대한 자세한 내용은 Databricks 설명서의 Databricks 개인용 액세스 토큰 인증
을 참조하세요. -
데이터 카탈로그 이름 (
)db-catalog
Databricks 카탈로그 이름에 대한 자세한 내용은 Databricks 설명서의 카탈로그 이름
을 참조하세요. -
스키마 이름 (
)db-schema
Databricks 스키마 이름에 대한 자세한 내용은 Databricks 설명서의 스키마 이름
을 참조하세요. -
테이블 이름 (
)db-table-name
Databricks 테이블 이름에 대한 자세한 내용은 Databricks 설명서의 테이블 이름
을 참조하세요. -
(선택 사항) Secrets Manager를 사용하는 경우, Secrets Manager에 Databricks 액세스 사용자 이름과 암호를 저장하는 보안 이름(
)이 필요합니다.secret-databricks-account-info
Secrets Manager에 대한 자세한 내용은 AWS Secrets Manager 사용 설명서의 에서 보안 암호 찾기 AWS Secrets Manager를 참조하세요.
-
AWS 리전 (
)your-region
Python용 SDK (Boto3를 사용하여 현재 세션의 리전 이름을 얻는 방법에 대한 자세한 내용은 Boto3 설명서의 region_name
을 참조하세요.
다음 예제는 Secrets Manager에서 JDBC URL 및 개인용 액세스 토큰을 검색하고 사용자 지정 데이터 소스 클래스 DatabricksDataSource
에 대해 read_data
를 재정의하는 방법을 보여줍니다.
from sagemaker.feature_store.feature_processor import PySparkDataSource import json import boto3 class DatabricksDataSource(PySparkDataSource): data_source_name = "Databricks" data_source_unique_id = "
databricks-url
" def read_data(self, spark, params): secret_name = "secret-databricks-account-info
" region_name = "your-region
" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) jdbc_url = secrets["jdbcurl"].replace("personal-access-token
", secrets['pwd']) return spark.read.format("jdbc") \ .option("url", jdbc_url) \ .option("dbtable","`db-catalog
`.`db-schema
`.`db-table-name
`") \ .option("driver", "com.simba.spark.jdbc.Driver") \ .load()
다음 예제는 JDBC 드라이버 jar,
을 Amazon S3에 업로드하여 Spark 클래스 경로에 추가하는 방법을 보여줍니다. Databricks에서 Spark JDBC 드라이버(jdbc-jar-file-name
.jar
)를 다운로드하는 방법에 대한 자세한 내용은 Databricks 웹 사이트의 JDBC 드라이버 다운로드jdbc-jar-file-name
.jar
from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[DatabricksDataSource()], output=
feature-group-arn
, target_stores=["OfflineStore"], spark_config={"spark.jars": "s3://your-bucket-name
/your-bucket-prefix
/jdbc-jar-file-name
.jar"} ) def transform(input_df): return input_df
특성 프로세서 작업을 원격으로 실행하려면 SparkConfig
를 정의하여 jar를 제공하고 @remote
데코레이터에 전달해야 합니다.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars": "s3://
your-bucket-name
/your-bucket-prefix
/jdbc-jar-file-name
.jar" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[DatabricksDataSource()], output="feature-group-arn
", target_stores=["OfflineStore"], ) def transform(input_df): return input_df
사용자 지정 데이터 소스 스트리밍 예제
Amazon Kinesis와 같은 스트리밍 데이터 소스에 연결하고 Spark 구조화 스트리밍으로 변환을 작성하여 스트리밍 데이터 소스에서 읽을 수 있습니다. Kinesis 커넥터에 대한 자세한 내용은 GitHub의 Spark 구조화 스트리밍을 위한 Kinesis 커넥터
사용자 지정 Amazon Kinesis 데이터 소스 클래스를 생성하려면 BaseDataSource
클래스를 확장하고 사용자 지정 데이터 소스에서 read_data
메서드를 재정의해야 합니다.
Amazon Kinesis 데이터 스트림에 연결하려면 다음이 필요합니다.
-
Kinesis ARN (
)kinesis-resource-arn
Kinesis 데이터 스트림 ARN에 대한 자세한 내용은 Amazon Kinesis 개발자 안내서의 Kinesis 데이터 스트림을 위한 Amazon 리소스 이름(ARN)을 참조하세요.
-
Kinesis 데이터 스트림 이름(
)kinesis-stream-name
-
AWS 리전 (
)your-region
Python용 SDK (Boto3를 사용하여 현재 세션의 리전 이름을 얻는 방법에 대한 자세한 내용은 Boto3 설명서의 region_name
을 참조하세요.
from sagemaker.feature_store.feature_processor import BaseDataSource from sagemaker.feature_store.feature_processor import feature_processor class KinesisDataSource(BaseDataSource): data_source_name = "Kinesis" data_source_unique_id = "
kinesis-resource-arn
" def read_data(self, spark, params): return spark.readStream.format("kinesis") \ .option("streamName", "kinesis-stream-name
") \ .option("awsUseInstanceProfile", "false") \ .option("endpointUrl", "https://kinesis..amazonaws.com") .load()
your-region
다음 예에서는 feature_processor
데코레이터에 KinesisDataSource
를 연결하는 방법을 보여줍니다.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig import feature_store_pyspark.FeatureStoreManager as fsm def ingest_micro_batch_into_fg(input_df, epoch_id): feature_group_arn = "
feature-group-arn
" fsm.FeatureStoreManager().ingest_data( input_data_frame = input_df, feature_group_arn = feature_group_arn ) @remote( spark_config=SparkConfig( configuration={ "Classification": "spark-defaults", "Properties":{ "spark.sql.streaming.schemaInference": "true", "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2" } } ), instance_type="ml.m5.2xlarge", max_runtime_in_seconds=2419200 # 28 days ) @feature_processor( inputs=[KinesisDataSource()], output="feature-group-arn
" ) def transform(input_df): output_stream = ( input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)") .writeStream.foreachBatch(ingest_micro_batch_into_fg) .trigger(processingTime="1 minute") .option("checkpointLocation", "s3a://checkpoint-path
") .start() ) output_stream.awaitTermination()
위 예제 코드에서는 특성 그룹으로 마이크로 배치를 스트리밍하는 동안 몇 가지 Spark 구조화 스트리밍 옵션을 사용합니다. 전체 옵션 목록은 Apache Spark 설명서의 구조화 스트리밍 프로그래밍 안내서
-
foreachBatch
싱크 모드는 스트리밍 쿼리의 각 마이크로 배치의 출력 데이터에 작업을 적용하고 로직을 작성할 수 있는 기능입니다.foreachBatch
에 대한 자세한 내용은 Apache Spark 구조화 스트리밍 프로그래밍 안내서의 Foreach 및 ForEachBatch 사용을 참조하세요. -
이
checkpointLocation
옵션은 스트리밍 애플리케이션의 상태를 주기적으로 저장합니다. 스트리밍 로그는 체크포인트 위치
에 저장됩니다.s3a://checkpoint-path
checkpointLocation
옵션에 대한 자세한 내용은 Apache Spark 구조화 스트리밍 프로그래밍 안내서의 체크포인트를 통한 장애 복구를 참조하세요. -
이
trigger
설정은 스트리밍 애플리케이션에서 마이크로 배치 처리가 트리거되는 빈도를 정의합니다. 예제에서 처리 시간 트리거 유형은trigger(processingTime="1 minute")
로 지정된 1분 마이크로 배치 간격으로 사용됩니다. 스트림 소스에서 채우려면trigger(availableNow=True)
로 지정된 available-now 트리거 유형을 사용할 수 있습니다.전체
trigger
유형 목록은 Apache Spark 구조화 스트리밍 프로그래밍 안내서의 트리거를 참조하세요.
이벤트 기반 트리거를 사용한 연속 스트리밍 및 자동 재시도
특성 프로세서는 SageMaker 훈련을 컴퓨팅 인프라로 사용하며 최대 런타임 한도는 28일입니다. 이벤트 기반 트리거를 사용하여 연속 스트리밍을 더 오랜 기간 연장하고 일시적인 장애로부터 복구할 수 있습니다. 일정 및 이벤트 기반 실행에 대한 자세한 내용은 특성 프로세서 파이프라인의 일정 예약 및 이벤트 기반 실행섹션을 참조하세요.
다음은 특성 프로세서 파이프라인 스트리밍이 계속 실행되도록 이벤트 기반 트리거를 설정하는 예제입니다. 이전 예에서 정의된 스트리밍 변환 함수를 사용합니다. 소스 파이프라인 실행에서 STOPPED
또는 FAILED
이벤트가 발생할 때 대상 파이프라인을 트리거하도록 구성할 수 있습니다. 계속 실행되도록 동일한 파이프라인이 소스와 타겟으로 사용된다는 점에 유의하세요.
import sagemaker.feature_store.feature_processor as fp from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus streaming_pipeline_name = "
streaming-pipeline
" streaming_pipeline_arn = fp.to_pipeline( pipeline_name = streaming_pipeline_name, step = transform # defined in previous section ) fp.put_trigger( source_pipeline_events=FeatureProcessorPipelineEvents( pipeline_name=source_pipeline_name, pipeline_execution_status=[ FeatureProcessorPipelineExecutionStatus.STOPPED, FeatureProcessorPipelineExecutionStatus.FAILED] ), target_pipeline=target_pipeline_name )