작업 북마크 사용
AWS Glue for Spark는 작업 북마크를 사용하여 이미 진행된 데이터를 추적합니다. 작업 북마크 기능에 대한 요약 및 지원 내용은 처리된 데이터를 작업 북마크로 추적 섹션을 참조하세요. 북마크를 사용해 AWS Glue 작업을 프로그래밍하면 시각적 작업에서 얻을 수 없었던 유연성을 활용할 수 있습니다.
-
JDBC에서 읽을 때 AWS Glue 스크립트에서 북마크 키로 사용할 열을 지정할 수 있습니다.
-
각 메서드의 직접 호출에 적용할
transformation_ctx
를 선택할 수 있습니다.
스크립트 시작 부분에서는 항상 job.init
를 호출하고 스크립트 끝 부분에서는 적절하게 구성된 파라미터를 사용하여 job.commit
을 호출하세요. 이 두 함수는 북마크 서비스를 초기화하고 서비스에 대한 상태 변경을 업데이트합니다. 북마크는 호출하지 않으면 작동하지 않습니다.
북마크 키 지정
JDBC 워크플로의 경우 북마크는 키 필드의 값을 북마크된 값과 비교하여 작업에서 읽은 행을 추적합니다. 이는 Amazon S3 워크플로에 필요하지 않거나 적용될 수 없습니다. 시각적 편집기 없이 AWS Glue 스크립트를 작성할 때 북마크로 추적할 열을 지정할 수 있습니다. 열을 여러 개 지정할 수도 있습니다. 사용자 정의 북마크 키를 지정할 때는 값 순서에 간격을 둘 수 있습니다.
주의
사용자 정의 북마크 키를 사용하는 경우 각각 엄격하게 단순 증가 또는 감소해야 합니다. 복합 키에 대해 추가 필드를 선택할 때 '마이너 버전' 또는 '개정 번호'와 같은 개념에 대한 필드는 해당 값이 데이터 세트 전체에서 재사용되므로 이 기준을 충족하지 않습니다.
다음과 같은 방법으로 jobBookmarkKeys
및 jobBookmarkKeysSortOrder
를 지정할 수 있습니다.
-
create_dynamic_frame.from_catalog
-additional_options
를 사용합니다. -
create_dynamic_frame.from_options
-connection_options
를 사용합니다.
변환 컨텍스트
대부분의 AWS Glue PySpark 동적 프레임 메서드에는 ETL 연산자 인스턴스의 고유한 식별자인 조건부 파라미터 transformation_ctx
가 들어 있습니다. transformation_ctx
파라미터는 작업 북마크 내에서 지정된 연산자의 상태 정보를 식별하는 데 사용됩니다. 특히 AWS Glue는 transformation_ctx
를 사용해 북마크 상태에 키를 인덱싱합니다.
주의
이 transformation_ctx
는 스크립트에서 특정 소스에 대한 책갈피 상태를 검색하는 키 역할을 합니다. 북마크가 제대로 작동하려면 항상 소스와 관련 transformation_ctx
를 일치시켜야 합니다. 소스 속성이나 transformation_ctx
의 이름을 변경하면 이전 북마크를 유효하지 않게 만들 수 있으며 타임스탬프 기반 필터링으로 인해 올바른 결과가 나오지 않을 수 있습니다.
작업 북마크가 제대로 작동하려면 작업 북마크 파라미터를 활성화하고 transformation_ctx
파라미터를 설정해야 합니다. transformation_ctx
파라미터를 전달하지 않으면 메서드에서 사용하는 테이블이나 동적 프레임에 대해 작업 북마크가 활성화되지 않습니다. 예를 들어, ETL 작업이 2개의 Amazon S3 소스를 읽고 조인하면 transformation_ctx
파라미터를 사용하고자 하는 북마크의 방법에만 전달할 수도 있습니다. 작업의 작업 북마크를 재설정하면 어떤 transformation_ctx
를 사용하든 간에 해당 작업과 연관된 모든 변환이 재설정됩니다.
DynamicFrameReader
클래스에 대한 자세한 내용은 DynamicFrameReader 클래스를 참조하십시오. PySpark 확장에 대한 자세한 내용은 AWS Glue PySpark 확장 참조 단원을 참조하십시오.
예시
다음은 Amazon S3 데이터 원본에 대해 생성된 스크립트의 예입니다. 작업 북마크를 사용하는 데 필요한 스크립트 부분은 기울임꼴로 표시됩니다. 이러한 요소에 대한 자세한 내용은 GlueContext 클래스 API 및 DynamicFrameWriter 클래스 API를 참조하세요.
# Sample Script import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0" ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2" ) job.commit()
다음은 JDBC 소스에 대해 생성된 스크립트의 예입니다. 소스 테이블은 기본 키로 empno
열이 있는 직원 테이블입니다. 북마크 키가 지정되지 않은 경우 작업은 기본적으로 순차 기본 키를 북마크 키로 사용하지만 empno
는 반드시 순차적이지 않기 때문에(값에 공백이 있을 수 있음) 기본 북마크 키로 적합하지 않습니다. 따라서 스크립트는 empno
를 북마크 키로 명시적으로 지정합니다. 코드의 해당 부분은 기울임꼴로 표시됩니다.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"} ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2" ) job.commit()