처리된 데이터를 작업 북마크로 추적 - AWS Glue

처리된 데이터를 작업 북마크로 추적

AWS Glue는 작업 실행의 상태 정보를 유지하여 이전에 ETL 작업을 실행할 때 이미 처리된 데이터를 추적합니다. 이와 같은 지속 상태 정보를 작업 북마크라고 합니다. AWS Glue는 작업 북마크로 상태 정보를 유지하고 이전 데이터의 재처리를 방지합니다. 작업 북마크를 사용하면 예약된 간격으로 재실행 중인 새 데이터를 처리할 수 있습니다. 작업 북마크는 원본, 변환 및 대상과 같은 다양한 작업 요소의 상태로 구성됩니다. 예를 들어, ETL 작업에서 Amazon S3 파일의 새 파티션을 읽어야 할 수 있습니다. AWS Glue는 그 작업에서 처리한 파티션을 추적하여 중복 실행을 방지하고 작업 대상인 데이터 스토어에 데이터를 복제합니다.

작업 북마크는 JDBC 데이터 원본, Relationalize 변환 및 일부 Amazon Simple Storage Service(Amazon S3) 소스에 대해 구현됩니다. 다음 테이블에는 AWS Glue가 작업 북마크에 대해 지원하는 Amazon S3 소스 포맷이 나열됩니다.

AWS Glue 버전 Amazon S3 소스 포맷
버전 0.9 JSON, CSV, Apache Avro, XML
버전 1.0 이상 JSON, CSV, Apache Avro, XML, Parquet, ORC

AWS Glue 버전에 대한 자세한 내용은 Spark 작업에 대한 작업 속성 정의 섹션을 참조하세요.

JDBC 소스의 경우 다음 규칙이 적용됩니다.

  • 각 테이블에 대해 AWS Glue는 하나 이상의 열을 북마크 키로 사용하여 새 데이터와 처리된 데이터를 결정합니다. 북마크 키가 결합되어 단일 복합 키를 형성합니다.

  • 북마크 키로 사용할 열을 지정할 수 있습니다. 북마크 키를 지정하지 않으면 AWS Glue는 기본적으로 기본 키를 북마크 키로 사용합니다. 단, 기본 키가 간격 없이 순차적으로 증가하거나 감소합니다.

  • 사용자 정의 북마크 키를 사용하는 경우 엄격하게 단순 증가 또는 감소해야 합니다. 간격이 허용됩니다.

  • AWS Glue는 대/소문자를 구분하는 열을 작업 북마크 키로 사용하는 것을 지원하지 않습니다.

AWS Glue에서 작업 북마크 사용

작업이 시작되면 작업 북마크 옵션을 파라미터로 전달합니다. 다음 표에 AWS Glue 콘솔에서 작업 북마크를 설정하기 위한 옵션이 설명되어 있습니다.

작업 북마크 설명
Enable 이전에 처리된 데이터를 추적하기 위해 작업 실행 후 상태를 업데이트하도록 합니다. 작업 북마크가 지원되는 소스의 작업이라면 이미 처리된 데이터를 추적하고, 작업 실행 시 마지막 체크포인트 이후에 받은 새 데이터를 처리합니다.
비활성화 작업 북마크를 사용하지 않고, 작업에서 항상 전체 데이터 세트를 처리합니다. 이전에 실행된 작업의 출력값 관리는 여러분이 직접 수행해야 합니다. 이 값이 기본값입니다.
일시 중지

마지막 북마크의 상태를 업데이트하지 않고 마지막으로 성공한 실행 이후의 증분 데이터 또는 다음 하위 옵션으로 식별된 범위의 데이터를 처리합니다. 이전에 실행된 작업의 출력값 관리는 여러분이 직접 수행해야 합니다. 두 개의 하위 옵션은 다음과 같습니다.

  • job-bookmark-from<시작 값>은 마지막으로 성공한 실행 전까지 처리된 모든 입력을 나타내는 실행 ID이며, 지정된 실행 ID를 포함합니다. 해당하는 입력은 무시됩니다.

  • job-bookmark-to<종료 값>는 마지막으로 성공한 실행 전까지 처리된 모든 입력을 나타내는 실행 ID이며, 지정된 실행 ID를 포함합니다. <시작 값>에 의해 식별된 입력을 제외한 해당하는 입력이 작업에서 처리됩니다. 이 입력 이후의 입력도 처리에서 제외됩니다.

이 옵션 세트를 지정하면 작업 북마크 상태가 업데이트되지 않습니다.

하위 옵션은 선택 사항이지만, 두 하위 옵션이 모두 사용되는 경우에는 제공해야 합니다.

명령줄에서 작업에 전달된 파라미터, 특히 작업 북마크에 대한 자세한 내용은 AWS Glue에서 사용하는 작업 파라미터 단원을 참조하십시오.

Amazon S3 입력 소스의 경우 AWS Glue 작업 북마크는 객체의 마지막 수정 시간을 검사해 어떤 객체를 재처리해야 하는지 확인합니다. 마지막 작업 실행 이후로 입력 소스 데이터가 수정된 경우, 작업을 다시 실행하면 그 파일이 재처리됩니다.

AWS Glue Spark ETL 작업에 대한 작업 북마크를 이전 작업 실행으로 되돌릴 수 있습니다. 작업 북마크를 이전의 작업 실행으로 되돌릴 수 있어 후속 작업 실행 시 북마크로 지정된 작업 실행의 데이터만 다시 처리하므로 데이터 다시 채우기 시나리오를 훨씬 효율적으로 지원할 수 있습니다.

모든 데이터를 동일한 작업에서 재처리하려는 경우에는 작업 북마크를 재설정합니다. 작업 북마크 상태를 재설정하려면 AWS Glue 콘솔, ResetJobBookmark 작업(Python: reset_job_bookmark) API 작업 또는 AWS CLI를 사용합니다. 예를 들어, AWS CLI에서는 다음 명령을 입력합니다.

aws glue reset-job-bookmark --job-name my-job-name

북마크를 되감거나 재설정할 때 여러 대상이 있을 수 있고 대상이 작업 북마크로 추적되지 않기 때문에 AWS Glue는 대상 파일을 정리하지 않습니다. 소스 파일만 북마크로 추적됩니다. 출력에서 중복 데이터를 피하기 위해 소스 파일을 되감고 다시 처리할 때 다른 출력 대상을 생성할 수 있습니다.

AWS Glue는 작업 북마크를 작업별로 추적합니다. 작업을 삭제하면 작업 북마크가 삭제됩니다.

AWS Glue 작업 북마크를 활성화했는데도 ETL 작업이 앞선 실행에서 이미 처리된 데이터를 재처리하는 경우가 있을 수 있습니다. 이러한 오류의 일반적인 원인 해결에 대한 자세한 내용은 AWS Glue 오류 해결 단원을 참조하십시오.

변환 컨텍스트

대부분의 AWS Glue PySpark 동적 프레임 메서드에는 ETL 연산자 인스턴스의 고유한 식별자인 조건부 파라미터 transformation_ctx가 들어 있습니다. transformation_ctx 파라미터는 작업 북마크 내에서 지정된 연산자의 상태 정보를 식별하는 데 사용됩니다. 특히 AWS Glue는 transformation_ctx를 사용해 북마크 상태에 키를 인덱싱합니다.

작업 북마크가 제대로 작동하려면 작업 북마크 파라미터를 활성화하고 transformation_ctx 파라미터를 설정해야 합니다. transformation_ctx 파라미터를 전달하지 않으면 메서드에서 사용하는 테이블이나 동적 프레임에 대해 작업 북마크가 활성화되지 않습니다. 예를 들어, ETL 작업이 2개의 Amazon S3 소스를 읽고 조인하면 transformation_ctx 파라미터를 사용하고자 하는 북마크의 방법에만 전달할 수도 있습니다. 작업의 작업 북마크를 재설정하면 어떤 transformation_ctx를 사용하든 간에 해당 작업과 연관된 모든 변환이 재설정됩니다.

DynamicFrameReader 클래스에 대한 자세한 내용은 DynamicFrameReader 클래스를 참조하십시오. PySpark 확장에 대한 자세한 내용은 AWS Glue PySpark 확장 참조 단원을 참조하십시오.

AWS Glue에서 생성한 스크립트로 작업 북마크 사용

이 단원에서는 작업 북마크 사용의 실무적 측면에 대해 좀 더 자세히 설명합니다. 또한 소스와 대상을 선택하고 작업을 실행할 때 AWS Glue에서 생성할 수 있는 스크립트의 예제를 제시합니다.

작업 북마크는 작업의 상태를 저장합니다. 각각의 상태 인스턴스를 작업 이름과 버전 번호로 입력합니다. 스크립트가 job.init를 호출하면 상태를 검색해 항상 최신 버전을 가져옵니다. 상태 안에는 여러 가지 상태 요소가 있는데, 이러한 요소는 해당 스크립트에서 소스, 변환 및 싱크 인스턴스별로 고유합니다. 이러한 상태 요소는 스크립트의 해당 요소(소스, 변환 또는 싱크)에 연결된 변환 컨텍스트로 식별할 수 있습니다. 상태 요소는 사용자 스크립트에서 job.commit를 호출할 때 기본적으로 저장됩니다. 이 스크립트는 인수에서 작업 이름과 작업 북마크에 대한 제어 옵션을 가져옵니다.

작업 북마크의 상태 요소는 소스, 변환 또는 싱크 관련 데이터입니다. 예를 들어, 업스트림 작업 또는 프로세스에서 끊임 없이 쓰는 Amazon S3 위치에서 증분 데이터를 읽으려 한다고 가정해 보겠습니다. 이 경우, 스크립트는 지금까지 처리된 항목을 확인해야 합니다. Amazon S3 소스에 대해 작업 북마크를 구현하면 정보를 저장하기 때문에 작업을 다시 실행할 때 저장된 정보를 사용해 새 객체만 필터링하여 작업의 다음 실행을 위해 상태를 재컴퓨팅할 수 있습니다. 타임스탬프는 새 파일을 필터링하는 데 사용됩니다.

작업 북마크에는 상태 요소 외에도 실행 횟수, 시도 횟수버전 번호가 포함되어 있습니다. 실행 횟수는 작업의 실행을 추적하고, 시도 횟수는 작업 실행을 시도한 횟수를 기록합니다. 작업 실행 횟수는 실행에 성공할 때마다 단순하게 늘어나는 숫자입니다. 시도 횟수는 각각의 실행 시도를 추적하며, 시도 실패 후 실행하는 경우에만 숫자가 늘어납니다. 버전 번호는 단순히 증가하고, 작업 북마크에 대한 업데이트를 추적합니다.

AWS Glue 서비스 데이터베이스에서 모든 변환에 대한 북마크 상태는 키-값 쌍으로 함께 저장됩니다.

{ "job_name" : ..., "run_id": ..., "run_number": .., "attempt_number": ... "states": { "transformation_ctx1" : { bookmark_state1 }, "transformation_ctx2" : { bookmark_state2 } } }

transformation_ctx는 스크립트에서 특정 소스에 대한 책갈피 상태를 검색하는 키 역할을 합니다. 북마크가 제대로 작동하려면 항상 소스와 관련 transformation_ctx를 일치시켜야 합니다. 소스 속성이나 transformation_ctx의 이름을 변경하면 이전 북마크를 유효하지 않게 만들 수 있으며 타임스탬프 기반 필터링으로 인해 올바른 결과가 나오지 않을 수 있습니다.

모범 사례

다음은 AWS Glue 생성 스크립트로 작업 북마크 사용에 관한 모범 사례입니다.

  • 항상 스크립트의 시작 부분에 job.init()이 있고 스크립트의 끝에 job.commit()이 있어야 합니다. 이 두 함수는 북마크 서비스를 초기화하고 서비스에 대한 상태 변경을 업데이트하는 데 사용됩니다. 북마크는 호출하지 않으면 작동하지 않습니다.

  • 북마크가 사용 설정된 상태에서 데이터 원본 속성을 변경하지 마세요. 예를 들어, Amazon S3 입력 경로 A를 가리키는 datasource0가 있으며 작업은 북마크가 사용 설정된 상태에서 여러 라운드 동안 실행 중인 소스에서 읽어오고 있습니다. transformation_ctx를 변경하지 않고 datasource0의 입력 경로를 Amazon S3 경로 B로 변경하는 경우, AWS Glue 작업은 저장된 이전 북마크 상태를 사용합니다. 그러면 AWS Glue가 해당 파일이 이전 실행에서 처리되었다고 가정하여 입력 경로 B에서 파일이 누락되거나 파일을 건너뛰게 됩니다.

  • 더 효과적인 파티션 관리를 위해 북마크가 있는 카탈로그 테이블을 사용하세요. 책갈피는 Data Catalog 또는 옵션의 데이터 원본에 모두 작동합니다. 그러나 옵션 접근 방식을 사용하면 새 파티션을 제거/추가하는 것은 어렵습니다. 크롤러로 카탈로그 테이블을 사용하면 새로 추가된 파티션을 추적하는 더 향상된 자동화를 제공할 수 있으며, 푸시다운 조건자로 특정 파티션을 선택할 수 있는 유연성이 확보됩니다.

  • 대용량 데이터 집합에 대해 AWS Glue Amazon S3 파일 리스터를 사용하세요. 북마크는 각 입력 파티션 아래의 모든 파일을 나열하고 필터링을 수행하므로 단일 파티션 아래에 파일이 너무 많으면 북마크가 드라이버 OOM으로 실행될 수 있습니다. 메모리에 있는 모든 파일을 한 번에 나열하지 않도록 AWS Glue Amazon S3 파일 리스터를 사용하세요.

다음은 Amazon S3 데이터 원본에 대해 생성된 스크립트의 예입니다. 작업 북마크를 사용하는 데 필요한 스크립트 부분은 굵게 기울임꼴로 표시됩니다. 이러한 요소에 대한 자세한 내용은 GlueContext 클래스 API 및 DynamicFrameWriter 클래스 API를 참조하세요.

# Sample Script import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0") ## @type: ApplyMapping ## @args: [mapping = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2") job.commit()

다음은 JDBC 소스에 대해 생성된 스크립트의 예입니다. 소스 테이블은 기본 키로 empno 열이 있는 직원 테이블입니다. 북마크 키가 지정되지 않은 경우 작업은 기본적으로 순차 기본 키를 북마크 키로 사용하지만 empno는 반드시 순차적이지 않기 때문에(값에 공백이 있을 수 있음) 기본 북마크 키로 적합하지 않습니다. 따라서 스크립트는 empno를 북마크 키로 명시적으로 지정합니다. 코드의 해당 부분은 굵게 기울임꼴로 표시됩니다.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [database = "hr", table_name = "emp", transformation_ctx = "datasource0"] ## @return: datasource0 ## @inputs: [] datasource0 = glueContext.create_dynamic_frame.from_catalog(database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"}) ## @type: ApplyMapping ## @args: [mapping = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1"] ## @return: applymapping1 ## @inputs: [frame = datasource0] applymapping1 = ApplyMapping.apply(frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1") ## @type: DataSink ## @args: [connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2"] ## @return: datasink2 ## @inputs: [frame = applymapping1] datasink2 = glueContext.write_dynamic_frame.from_options(frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2") job.commit()

다음과 같은 방법으로 jobBookmarkKeysjobBookmarkKeysSortOrder를 지정할 수 있습니다.

  • create_dynamic_frame.from_catalog - additional_options를 사용합니다.

  • create_dynamic_frame.from_options - connection_options를 사용합니다.

작업 북마크와 관련된 연결 옵션에 대한 자세한 내용은 JDBC connectionType 값 섹션을 참조하세요.