쿠키 기본 설정 선택

당사는 사이트와 서비스를 제공하는 데 필요한 필수 쿠키 및 유사한 도구를 사용합니다. 고객이 사이트를 어떻게 사용하는지 파악하고 개선할 수 있도록 성능 쿠키를 사용해 익명의 통계를 수집합니다. 필수 쿠키는 비활성화할 수 없지만 '사용자 지정' 또는 ‘거부’를 클릭하여 성능 쿠키를 거부할 수 있습니다.

사용자가 동의하는 경우 AWS와 승인된 제3자도 쿠키를 사용하여 유용한 사이트 기능을 제공하고, 사용자의 기본 설정을 기억하고, 관련 광고를 비롯한 관련 콘텐츠를 표시합니다. 필수가 아닌 모든 쿠키를 수락하거나 거부하려면 ‘수락’ 또는 ‘거부’를 클릭하세요. 더 자세한 내용을 선택하려면 ‘사용자 정의’를 클릭하세요.

사용자 지정 데이터 소스 예제

포커스 모드
사용자 지정 데이터 소스 예제 - Amazon SageMaker AI

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

이 섹션에서는 특성 프로세서의 사용자 지정 데이터 소스 구현의 예를 제공합니다. 사용자 지정 데이터 소스에 대한 자세한 내용은 사용자 지정 데이터 소스섹션을 참조하세요.

보안은 AWS 와 고객 간의 공동 책임입니다. AWS 는에서 서비스를 실행하는 인프라를 보호할 책임이 있습니다 AWS 클라우드. 고객은 필요한 모든 보안 구성과 관리 작업에 책임이 있습니다. 예를 들어 데이터 저장소에 대한 액세스 자격 증명과 같은 보안 정보를 사용자 지정 데이터 소스에 하드 코딩해서는 안 됩니다. AWS Secrets Manager 를 사용하여 이러한 자격 증명을 관리할 수 있습니다. Secrets Manager에 대한 자세한 내용은 AWS Secrets Manager 사용 설명서의 란 무엇입니까 AWS Secrets Manager?를 참조하세요. 다음 예시에서는 Secrets Manager를 자격 증명으로 사용합니다.

Amazon Redshift 클러스터(JDBC) 사용자 지정 데이터 소스 예제

Amazon Redshift는 Spark로 데이터를 읽는 데 사용할 수 있는 JDBC 드라이버를 제공합니다. Amazon Redshift JDBC 드라이버를 다운로드하는 방법에 대한 자세한 내용은 Amazon Redshift JDBC 드라이버, 버전 2.1 다운로드를 참조하세요.

사용자 지정 Amazon Redshift 데이터 소스 클래스를 생성하려면 사용자 지정 데이터 소스에서 read_data메서드를 덮어써야 합니다.

Amazon Redshift 클러스터에 연결하려면 다음이 필요합니다.

  • Amazon Redshift JDBC URL (jdbc-url)

    Amazon Redshift JDBC URL을 얻는 방법에 대한 자세한 내용은 Amazon Redshift 데이터베이스 개발자 안내서의 JDBC URL 가져오기를 참조하세요.

  • Amazon Redshift 사용자 이름(redshift-user) 및 암호(redshift-password)

    Amazon Redshift SQL 명령을 사용하여 데이터베이스 사용자를 생성하고 관리하는 방법에 대한 자세한 내용은 Amazon Redshift 데이터베이스 개발자 안내서의 사용자를 참조하세요.

  • Amazon Redshift 테이블 이름(redshift-table-name)

    테이블을 생성하는 방법과 몇 가지 예에 대한 자세한 내용은 Amazon Redshift 데이터베이스 개발자 안내서의 테이블 생성 섹션을 참조하세요.

  • (선택 사항) Secrets Manager를 사용하는 경우, Secrets Manager에 Amazon Redshift 액세스 사용자 이름과 암호를 저장하는 보안 이름(secret-redshift-account-info) 이 필요합니다.

    Secrets Manager에 대한 자세한 내용은 AWS Secrets Manager 사용 설명서의 에서 보안 암호 찾기 AWS Secrets Manager를 참조하세요.

  • AWS 리전 (your-region)

    Python용 SDK (Boto3를 사용하여 현재 세션의 리전 이름을 얻는 방법에 대한 자세한 내용은 Boto3 설명서의 region_name을 참조하세요.

다음 예제는 Secrets Manager에서 JDBC URL 및 개인용 액세스 토큰을 검색하고 사용자 지정 데이터 소스 클래스 DatabricksDataSource에 대해 read_data를 재정의하는 방법을 보여줍니다.

from sagemaker.feature_store.feature_processor import PySparkDataSource import json import boto3 class RedshiftDataSource(PySparkDataSource): data_source_name = "Redshift" data_source_unique_id = "redshift-resource-arn" def read_data(self, spark, params): url = "jdbc-url?user=redshift-user&password=redshift-password" aws_iam_role_arn = "redshift-command-access-role" secret_name = "secret-redshift-account-info" region_name = "your-region" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) jdbc_url = url.replace("jdbc-url", secrets["jdbcurl"]).replace("redshift-user", secrets['username']).replace("redshift-password", secrets['password']) return spark.read \ .format("jdbc") \ .option("url", url) \ .option("driver", "com.amazon.redshift.Driver") \ .option("dbtable", "redshift-table-name") \ .option("tempdir", "s3a://your-bucket-name/your-bucket-prefix") \ .option("aws_iam_role", aws_iam_role_arn) \ .load()

다음 예에서는 feature_processor데코레이터에 RedshiftDataSource를 연결하는 방법을 보여줍니다.

from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[RedshiftDataSource()], output="feature-group-arn", target_stores=["OfflineStore"], spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"} ) def transform(input_df): return input_df

특성 프로세서 작업을 원격으로 실행하려면 SparkConfig를 정의하여 jdbc 드라이버를 제공하고 이를 @remote데코레이터에 전달해야 합니다.

from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[RedshiftDataSource()], output="feature-group-arn", target_stores=["OfflineStore"], ) def transform(input_df): return input_df

Snowflake 사용자 지정 데이터 소스 예제

Snowflake는 feature_processor데코레이터로 사용할 수 있는 Spark 커넥터를 제공합니다. Spark용 Snowflake 커넥터에 대한 자세한 내용은 Snowflake 설명서의 Spark용 Snowflake 커넥터를 참조하세요.

사용자 지정 Snowflake 데이터 소스 클래스를 만들려면 사용자 지정 데이터 소스read_data메서드를 재정의하고 Spark 커넥터 패키지를 Spark 클래스 경로에 추가해야 합니다.

Snowflake 데이터 소스에 연결하려면 다음이 필요합니다.

  • Snowflake URL (sf-url)

    Snowflake 웹 인터페이스에 액세스하기 위한 URL에 대한 자세한 내용은 Snowflake 설명서의 계정 식별자를 참조하세요.

  • Snowflake 데이터베이스 (sf-database)

    Snowflake를 사용하여 데이터베이스 이름을 얻는 방법에 대한 자세한 내용은 Snowflake 설명서의 CURRENT_DATABASE를 참조하세요.

  • Snowflake 데이터베이스 스키마 (sf-schema)

    Snowflake를 사용하여 스키마의 이름을 얻는 방법에 대한 자세한 내용은 Snowflake 설명서의 CURRENT_SCHEMA를 참조하세요.

  • Snowflake 웨어하우스 (sf-warehouse)

    Snowflake를 사용하여 웨어하우스 이름을 얻는 방법에 대한 자세한 내용은 Snowflake 설명서의 CURRENT_WAREHOUSE를 참조하세요.

  • Snowflake 테이블 이름 (sf-table-name)

  • (선택 사항) Secrets Manager를 사용하는 경우, Secrets Manager에 Snowflake 액세스 사용자 이름과 암호를 저장하는 보안 이름(secret-snowflake-account-info)이 필요합니다.

    Secrets Manager에 대한 자세한 내용은 AWS Secrets Manager 사용 설명서의 에서 보안 암호 찾기 AWS Secrets Manager를 참조하세요.

  • AWS 리전 (your-region)

    Python용 SDK (Boto3를 사용하여 현재 세션의 리전 이름을 얻는 방법에 대한 자세한 내용은 Boto3 설명서의 region_name을 참조하세요.

다음 예제는 Secrets Manager에서 Snowflake 사용자 이름과 암호를 검색하고 사용자 지정 데이터 소스 클래스 SnowflakeDataSourceread_data함수를 재정의하는 방법을 보여줍니다.

from sagemaker.feature_store.feature_processor import PySparkDataSource from sagemaker.feature_store.feature_processor import feature_processor import json import boto3 class SnowflakeDataSource(PySparkDataSource): sf_options = { "sfUrl" : "sf-url", "sfDatabase" : "sf-database", "sfSchema" : "sf-schema", "sfWarehouse" : "sf-warehouse", } data_source_name = "Snowflake" data_source_unique_id = "sf-url" def read_data(self, spark, params): secret_name = "secret-snowflake-account-info" region_name = "your-region" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) self.sf_options["sfUser"] = secrets.get("username") self.sf_options["sfPassword"] = secrets.get("password") return spark.read.format("net.snowflake.spark.snowflake") \ .options(**self.sf_options) \ .option("dbtable", "sf-table-name") \ .load()

다음 예에서는 feature_processor데코레이터에 SnowflakeDataSource를 연결하는 방법을 보여줍니다.

from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[SnowflakeDataSource()], output=feature-group-arn, target_stores=["OfflineStore"], spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"} ) def transform(input_df): return input_df

특성 프로세서 작업을 원격으로 실행하려면 SparkConfig정의를 통해 패키지를 제공하고 @remote데코레이터에 전달해야 합니다. 다음 예제의 Spark 패키지에서 spark-snowflake_2.12는 특성 프로세서 Scala 버전, 2.12.0은 사용하려는 Snowflake 버전, spark_3.3은 특성 프로세서 Spark 버전입니다.

from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[SnowflakeDataSource()], output="feature-group-arn>", target_stores=["OfflineStore"], ) def transform(input_df): return input_df

Databricks (JDBC) 사용자 지정 데이터 소스 예제

Spark는 Databricks JDBC 드라이버를 사용하여 Databricks에서 데이터를 읽을 수 있습니다. Databricks JDBC 드라이버에 대한 자세한 내용은 Databricks 설명서의 Databricks ODBC 및 JDBC 드라이버 구성을 참조하세요.

참고

Spark 클래스 경로에 해당 JDBC 드라이버를 포함하여 다른 데이터베이스에서 데이터를 읽을 수 있습니다. 자세한 내용은 Spark SQL 설명서의 JDBC에서 다른 데이터베이스를 참조하세요.

사용자 지정 Databricks 데이터 소스 클래스를 만들려면 사용자 지정 데이터 소스에서 read_data메서드를 재정의하고 Spark 클래스 경로에 JDBC jar를 추가해야 합니다.

Databricks 데이터 소스에 연결하려면 다음이 필요합니다.

  • Databricks URL (databricks-url)

    Databricks URL에 대한 자세한 내용은 Databricks 설명서의 Databricks 드라이버용 연결 URL 빌드를 참조하세요.

  • Databricks 개인용 액세스 토큰 (personal-access-token)

    Databricks 액세스 토큰에 대한 자세한 내용은 Databricks 설명서의 Databricks 개인용 액세스 토큰 인증을 참조하세요.

  • 데이터 카탈로그 이름 (db-catalog)

    Databricks 카탈로그 이름에 대한 자세한 내용은 Databricks 설명서의 카탈로그 이름을 참조하세요.

  • 스키마 이름 (db-schema)

    Databricks 스키마 이름에 대한 자세한 내용은 Databricks 설명서의 스키마 이름을 참조하세요.

  • 테이블 이름 (db-table-name)

    Databricks 테이블 이름에 대한 자세한 내용은 Databricks 설명서의 테이블 이름을 참조하세요.

  • (선택 사항) Secrets Manager를 사용하는 경우, Secrets Manager에 Databricks 액세스 사용자 이름과 암호를 저장하는 보안 이름(secret-databricks-account-info)이 필요합니다.

    Secrets Manager에 대한 자세한 내용은 AWS Secrets Manager 사용 설명서의 에서 보안 암호 찾기 AWS Secrets Manager를 참조하세요.

  • AWS 리전 (your-region)

    Python용 SDK (Boto3를 사용하여 현재 세션의 리전 이름을 얻는 방법에 대한 자세한 내용은 Boto3 설명서의 region_name을 참조하세요.

다음 예제는 Secrets Manager에서 JDBC URL 및 개인용 액세스 토큰을 검색하고 사용자 지정 데이터 소스 클래스 DatabricksDataSource에 대해 read_data를 재정의하는 방법을 보여줍니다.

from sagemaker.feature_store.feature_processor import PySparkDataSource import json import boto3 class DatabricksDataSource(PySparkDataSource): data_source_name = "Databricks" data_source_unique_id = "databricks-url" def read_data(self, spark, params): secret_name = "secret-databricks-account-info" region_name = "your-region" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) jdbc_url = secrets["jdbcurl"].replace("personal-access-token", secrets['pwd']) return spark.read.format("jdbc") \ .option("url", jdbc_url) \ .option("dbtable","`db-catalog`.`db-schema`.`db-table-name`") \ .option("driver", "com.simba.spark.jdbc.Driver") \ .load()

다음 예제는 JDBC 드라이버 jar, jdbc-jar-file-name.jar을 Amazon S3에 업로드하여 Spark 클래스 경로에 추가하는 방법을 보여줍니다. Databricks에서 Spark JDBC 드라이버(jdbc-jar-file-name.jar)를 다운로드하는 방법에 대한 자세한 내용은 Databricks 웹 사이트의 JDBC 드라이버 다운로드를 참조하세요.

from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[DatabricksDataSource()], output=feature-group-arn, target_stores=["OfflineStore"], spark_config={"spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"} ) def transform(input_df): return input_df

특성 프로세서 작업을 원격으로 실행하려면 SparkConfig를 정의하여 jar를 제공하고 @remote데코레이터에 전달해야 합니다.

from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[DatabricksDataSource()], output="feature-group-arn", target_stores=["OfflineStore"], ) def transform(input_df): return input_df

사용자 지정 데이터 소스 스트리밍 예제

Amazon Kinesis와 같은 스트리밍 데이터 소스에 연결하고 Spark 구조화 스트리밍으로 변환을 작성하여 스트리밍 데이터 소스에서 읽을 수 있습니다. Kinesis 커넥터에 대한 자세한 내용은 GitHub의 Spark 구조화 스트리밍을 위한 Kinesis 커넥터를 참조하세요. Amazon Kinesis에 대한 자세한 내용은 Amazon Kinesis 개발자 안내서의 Amazon Kinesis Data Streams란 무엇입니까?를 참조하세요.

사용자 지정 Amazon Kinesis 데이터 소스 클래스를 생성하려면 BaseDataSource클래스를 확장하고 사용자 지정 데이터 소스에서 read_data메서드를 재정의해야 합니다.

Amazon Kinesis 데이터 스트림에 연결하려면 다음이 필요합니다.

  • Kinesis ARN (kinesis-resource-arn)

    Kinesis 데이터 스트림 ARN에 대한 자세한 내용은 Amazon Kinesis 개발자 안내서의 Kinesis 데이터 스트림을 위한 Amazon 리소스 이름(ARN)을 참조하세요.

  • Kinesis 데이터 스트림 이름(kinesis-stream-name)

  • AWS 리전 (your-region)

    Python용 SDK (Boto3를 사용하여 현재 세션의 리전 이름을 얻는 방법에 대한 자세한 내용은 Boto3 설명서의 region_name을 참조하세요.

from sagemaker.feature_store.feature_processor import BaseDataSource from sagemaker.feature_store.feature_processor import feature_processor class KinesisDataSource(BaseDataSource): data_source_name = "Kinesis" data_source_unique_id = "kinesis-resource-arn" def read_data(self, spark, params): return spark.readStream.format("kinesis") \ .option("streamName", "kinesis-stream-name") \ .option("awsUseInstanceProfile", "false") \ .option("endpointUrl", "https://kinesis.your-region.amazonaws.com") .load()

다음 예에서는 feature_processor데코레이터에 KinesisDataSource를 연결하는 방법을 보여줍니다.

from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig import feature_store_pyspark.FeatureStoreManager as fsm def ingest_micro_batch_into_fg(input_df, epoch_id): feature_group_arn = "feature-group-arn" fsm.FeatureStoreManager().ingest_data( input_data_frame = input_df, feature_group_arn = feature_group_arn ) @remote( spark_config=SparkConfig( configuration={ "Classification": "spark-defaults", "Properties":{ "spark.sql.streaming.schemaInference": "true", "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2" } } ), instance_type="ml.m5.2xlarge", max_runtime_in_seconds=2419200 # 28 days ) @feature_processor( inputs=[KinesisDataSource()], output="feature-group-arn" ) def transform(input_df): output_stream = ( input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)") .writeStream.foreachBatch(ingest_micro_batch_into_fg) .trigger(processingTime="1 minute") .option("checkpointLocation", "s3a://checkpoint-path") .start() ) output_stream.awaitTermination()

위 예제 코드에서는 특성 그룹으로 마이크로 배치를 스트리밍하는 동안 몇 가지 Spark 구조화 스트리밍 옵션을 사용합니다. 전체 옵션 목록은 Apache Spark 설명서의 구조화 스트리밍 프로그래밍 안내서를 참조하세요.

  • foreachBatch 싱크 모드는 스트리밍 쿼리의 각 마이크로 배치의 출력 데이터에 작업을 적용하고 로직을 작성할 수 있는 기능입니다.

    foreachBatch에 대한 자세한 내용은 Apache Spark 구조화 스트리밍 프로그래밍 안내서의 Foreach 및 ForEachBatch 사용을 참조하세요.

  • checkpointLocation옵션은 스트리밍 애플리케이션의 상태를 주기적으로 저장합니다. 스트리밍 로그는 체크포인트 위치 s3a://checkpoint-path에 저장됩니다.

    checkpointLocation 옵션에 대한 자세한 내용은 Apache Spark 구조화 스트리밍 프로그래밍 안내서의 체크포인트를 통한 장애 복구를 참조하세요.

  • trigger설정은 스트리밍 애플리케이션에서 마이크로 배치 처리가 트리거되는 빈도를 정의합니다. 예제에서 처리 시간 트리거 유형은 trigger(processingTime="1 minute")로 지정된 1분 마이크로 배치 간격으로 사용됩니다. 스트림 소스에서 채우려면 trigger(availableNow=True)로 지정된 available-now 트리거 유형을 사용할 수 있습니다.

    전체 trigger유형 목록은 Apache Spark 구조화 스트리밍 프로그래밍 안내서의 트리거를 참조하세요.

이벤트 기반 트리거를 사용한 연속 스트리밍 및 자동 재시도

특성 프로세서는 SageMaker 훈련을 컴퓨팅 인프라로 사용하며 최대 런타임 한도는 28일입니다. 이벤트 기반 트리거를 사용하여 연속 스트리밍을 더 오랜 기간 연장하고 일시적인 장애로부터 복구할 수 있습니다. 일정 및 이벤트 기반 실행에 대한 자세한 내용은 특성 프로세서 파이프라인의 일정 예약 및 이벤트 기반 실행섹션을 참조하세요.

다음은 특성 프로세서 파이프라인 스트리밍이 계속 실행되도록 이벤트 기반 트리거를 설정하는 예제입니다. 이전 예에서 정의된 스트리밍 변환 함수를 사용합니다. 소스 파이프라인 실행에서 STOPPED또는 FAILED이벤트가 발생할 때 대상 파이프라인을 트리거하도록 구성할 수 있습니다. 계속 실행되도록 동일한 파이프라인이 소스와 타겟으로 사용된다는 점에 유의하세요.

import sagemaker.feature_store.feature_processor as fp from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus streaming_pipeline_name = "streaming-pipeline" streaming_pipeline_arn = fp.to_pipeline( pipeline_name = streaming_pipeline_name, step = transform # defined in previous section ) fp.put_trigger( source_pipeline_events=FeatureProcessorPipelineEvents( pipeline_name=source_pipeline_name, pipeline_execution_status=[ FeatureProcessorPipelineExecutionStatus.STOPPED, FeatureProcessorPipelineExecutionStatus.FAILED] ), target_pipeline=target_pipeline_name )
프라이버시사이트 이용 약관쿠키 기본 설정
© 2025, Amazon Web Services, Inc. 또는 계열사. All rights reserved.