AWS Glue Scala GlueContext API - AWS Glue

AWS Glue Scala GlueContext API

패키지:   com.amazonaws.services.glue

class GlueContext extends SQLContext(sc) ( @transient val sc : SparkContext, val defaultSourcePartitioner : PartitioningStrategy )

GlueContext는 Amazon Simple Storage Service(Amazon S3), AWS Glue Data Catalog, JDBC 등에서 DynamicFrame을 읽고 쓰기 위한 진입점입니다. 이 클래스는 DataSource 특성DataSink 객체를 생성하는 유틸리티 함수를 제공하며, 이 객체는 DynamicFrame을 읽고 쓸 때 사용할 수 있습니다.

GlueContext를 사용하여 DynamicFrame에 목표한 수의 파티션(기본값 20)을 설정할 수도 있습니다. 단, 소스에서 생성된 파티션 수가 파티션 최소 임계값(기본값 10) 미만이어야 합니다.

def addIngestionColumns

def addIngestionTimeColumns( df : DataFrame, timeGranularity : String = "") : dataFrame

입력 DataFrameingest_year, ingest_month, ingest_day, ingest_hour, ingest_minute와 같은 수집 시간 열을 추가합니다. 이 함수는 Amazon S3을 대상으로 하는 Data Catalog 테이블을 지정할 때 AWS Glue가 생성하는 스크립트에서 자동으로 생성됩니다. 이 함수는 출력 테이블의 수집 시간 열로 파티션을 자동으로 업데이트합니다. 이를 통해 입력 데이터에 명시적인 수집 시간 열을 요구하지 않고도 출력 데이터를 수집 시간에 자동으로 분할할 수 있습니다.

  • dataFrame - 수집 시간 열을 추가할 dataFrame입니다.

  • timeGranularity - 시간 열의 세분성입니다. 유효 값은 "day", "hour" 및 "minute"입니다. 예를 들어 "hour"가 함수에 전달되면 원래 dataFrame에 "ingest_year", "ingest_month", "ingest_day" 및 "ingest_hour" 시간 열이 추가됩니다.

시간 세분성 열을 추가한 후 데이터 프레임을 반환합니다.

예제:

glueContext.addIngestionTimeColumns(dataFrame, "hour")

def createDataFrameFromOptions

def createDataFrameFromOptions( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

지정된 연결 및 포맷으로 생성된 DataFrame을 반환합니다. 이 함수는 AWS Glue 스트리밍 소스에만 사용합니다.

  • connectionType - 스트리밍 연결 유형입니다. 유효한 값에는 kinesiskafka이 있습니다.

  • connectionOptions - 연결 옵션이며, Kinesis 및 Kafka에 대해 서로 다릅니다. AWS Glue에서 ETL 관련 연결 유형 및 옵션에서 각 스트리밍 데이터 원본에 대한 모든 연결 옵션 목록을 찾을 수 있습니다. 스트리밍 연결 옵션에는 다음과 같은 차이점이 있습니다.

    • Kinesis 스트리밍 소스에는 streamARN, startingPosition, inferSchemaclassification이 필요합니다.

    • Kafka 스트리밍 소스에는 connectionName, topicName, startingOffsets, inferSchemaclassification이 필요합니다.

  • transformationContext - 사용할 변환 컨텍스트입니다(선택 사항).

  • format - 형식 사양입니다(선택 사항). 여러 포맷을 지원하는 Amazon S3 또는 AWS Glue 연결에 사용됩니다. 지원되는 형식에 관한 내용은 AWS Glue에서의 입력 및 출력의 데이터 형식 옵션 섹션을 참조하세요.

  • formatOptions - 지정된 형식에 대한 형식 옵션입니다. 지원되는 포맷 옵션에 대한 자세한 내용은 데이터 포맷 옵션 섹션을 참조하세요.

Amazon Kinesis 스트리밍 소스의 예:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))

Kafka 스트리밍 소스의 예:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))

forEachBatch

forEachBatch(frame, batch_function, options)

스트리밍 소스에서 읽는 모든 마이크로 배치에 전달된 batch_function을 적용합니다.

  • frame - 현재 마이크로 배치를 포함하는 DataFrame입니다.

  • batch_function - 모든 마이크로 배치에 적용될 함수입니다.

  • options - 마이크로 배치를 처리하는 방법에 대한 정보가 들어 있는 키-값 페어 컬렉션입니다. 다음 옵션이 필요합니다.

    • windowSize - 각 배치를 처리하는 데 소요되는 시간입니다.

    • checkpointLocation - 스트리밍 ETL 작업에 대해 체크포인트가 저장되는 위치입니다.

    • batchMaxRetries – 실패한 경우 배치를 다시 시도할 수 있는 최대 횟수입니다. 기본값은 3입니다. 이 옵션은 Glue 버전 2.0 이상에서만 구성할 수 있습니다.

예:

glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))

def getCatalogSink

def getCatalogSink( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSink

Data Catalog에 정의된 테이블에 지정된 위치에 쓰는 DataSink를 생성합니다.

  • database - Data Catalog에 있는 데이터베이스 이름입니다.

  • tableName - Data Catalog에 있는 테이블 이름입니다.

  • redshiftTmpDir - 특정 데이터 싱크와 사용되는 임시 준비 디렉터리입니다. 기본적으로 비우도록 설정합니다.

  • transformationContext - 작업 북마크에서 사용되는 싱크와 관련된 변환 컨텍스트입니다. 기본적으로 비우도록 설정합니다.

  • additionalOptions - AWS Glue에 제공되는 추가 옵션입니다.

  • catalogId - 액세스 중인 Data Catalog의 카탈로그 ID(계정 ID)입니다. null인 경우 호출자의 기본 계정 ID가 사용됩니다.

DataSink을 반환합니다.

def getCatalogSource

def getCatalogSource( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" pushDownPredicate : String = " " additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSource

Data Catalog의 테이블 정의에서 데이터를 읽는 DataSource 특성를 생성합니다.

  • database - Data Catalog에 있는 데이터베이스 이름입니다.

  • tableName - Data Catalog에 있는 테이블 이름입니다.

  • redshiftTmpDir - 특정 데이터 싱크와 사용되는 임시 준비 디렉터리입니다. 기본적으로 비우도록 설정합니다.

  • transformationContext - 작업 북마크에서 사용되는 싱크와 관련된 변환 컨텍스트입니다. 기본적으로 비우도록 설정합니다.

  • pushDownPredicate - 데이터 집합 내 모든 파일을 나열하거나 읽지 않아도 파티션에 필터링할 수 있습니다. 자세한 정보는 푸시다운 조건자를 사용하여 예비 필터링을 참조하십시오.

  • additionalOptions - 선택적 이름-값 페어의 모음입니다. 가능한 옵션에는 AWS Glue에서 ETL 관련 연결 유형 및 옵션에 나열된 옵션이 포함됩니다(endpointUrl, streamName, bootstrap.servers, security.protocol, topicName, classificationdelimiter 제외). 지원되는 또 다른 옵션은 catalogPartitionPredicate입니다.

    catalogPartitionPredicate - 카탈로그 표현식을 전달하여 인덱스 열을 기준으로 필터링할 수 있습니다. 이렇게 하면 필터링이 서버 측으로 푸시됩니다. 자세한 내용은 AWS Glue 파티션 인덱스를 참조하세요. push_down_predicatecatalogPartitionPredicate는 다른 구문을 사용합니다. 전자는 Spark SQL 표준 구문을 사용하고 후자는 JSQL 구문 분석기를 사용합니다.

  • catalogId - 액세스 중인 Data Catalog의 카탈로그 ID(계정 ID)입니다. null인 경우 호출자의 기본 계정 ID가 사용됩니다.

DataSource을 반환합니다.

스트리밍 소스의 예

val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()

def getJDBCSink

def getJDBCSink( catalogConnection : String, options : JsonOptions, redshiftTmpDir : String = "", transformationContext : String = "", catalogId: String = null ) : DataSink

Data Catalog의 Connection 객체에 지정된 JDBC 데이터베이스에 쓰는 DataSink를 생성합니다. Connection 객체는 URL, 사용자 이름, 암호, VPC, 서브넷 및 보안 그룹을 포함한 JDBC 싱크로 연결하는 정보를 가지고 있습니다.

  • catalogConnection - 작성할 JDBC URL을 포함하는 Data Catalog의 연결 이름입니다.

  • options - JDBC 데이터 스토어로 작성할 때 필요한 추가 정보를 제공하는 JSON 이름-값 페어의 문자열입니다. 여기에는 다음이 포함됩니다.

    • dbtable(필수) - JDBC 테이블의 이름입니다. 데이터베이스 내의 스키마를 지원하는 JDBC 데이터 스토어의 경우 schema.table-name에 대해 지정합니다. 스키마가 제공되지 않으면 기본 "퍼블릭" 스키마가 사용됩니다. 다음 예제에서는 test라는 스키마와 test_db 데이터베이스의 test_table 테이블을 가리키는 옵션 파라미터를 보여 줍니다.

      options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
    • database(필수) - JDBC 데이터베이스의 이름입니다.

    • SparkSQL JDBC 라이터에게 직접 전달되는 추가 옵션. 자세한 내용은 Redshift data source for Spark를 참조하십시오.

  • redshiftTmpDir - 특정 데이터 싱크와 사용되는 임시 준비 디렉터리입니다. 기본적으로 비우도록 설정합니다.

  • transformationContext - 작업 북마크에서 사용되는 싱크와 관련된 변환 컨텍스트입니다. 기본적으로 비우도록 설정합니다.

  • catalogId - 액세스 중인 Data Catalog의 카탈로그 ID(계정 ID)입니다. null인 경우 호출자의 기본 계정 ID가 사용됩니다.

예제 코드:

getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")

DataSink을 반환합니다.

def getSink

def getSink( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" ) : DataSink

Amazon Simple Storage Service(Amazon S3), JDBC 또는 AWS Glue Data Catalog와 같은 대상에 데이터를 쓰는 DataSink를 생성합니다.

DataSink을 반환합니다.

def getSinkWithFormat

def getSinkWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSink

Amazon S3, JDBC 또는 Data Catalog 같은 대상에 데이터를 쓰는 DataSink를 생성하고, 데이터 포맷을 대상에 쓰도록 설정합니다.

  • connectionType - 연결 유형입니다. AWS Glue에서 ETL 관련 연결 유형 및 옵션 섹션을 참조하세요.

  • options - 데이터 싱크 연결하려는 추가 정보를 제공하는 JSON 이름-값 페어 문자열입니다. AWS Glue에서 ETL 관련 연결 유형 및 옵션 섹션을 참조하세요.

  • transformationContext - 작업 북마크에서 사용되는 싱크와 관련된 변환 컨텍스트입니다. 기본적으로 비우도록 설정합니다.

  • format - 대상에 작성되는 데이터의 포맷입니다.

  • formatOptions - 대상 주소로 데이터를 구성하는 추가 옵션을 제공하는 JSON 이름-값 페어 문자열입니다. 데이터 포맷 옵션 섹션을 참조하세요.

DataSink을 반환합니다.

def getSource

def getSource( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" pushDownPredicate ) : DataSource

Amazon S3, JDBC, AWS Glue Data Catalog 같은 소스에서 데이터를 읽는 DataSource 특성를 생성합니다. Kafka 및 Kinesis 스트리밍 데이터 원본도 지원합니다.

  • connectionType - 데이터 원본의 유형입니다. AWS Glue에서 ETL 관련 연결 유형 및 옵션 섹션을 참조하세요.

  • connectionOptions - 데이터 원본을 연결하려는 추가 정보를 제공하는 JSON 이름-값 페어 문자열입니다. 자세한 정보는 AWS Glue에서 ETL 관련 연결 유형 및 옵션을 참조하십시오.

    Kinesis 스트리밍 소스에는 streamARN, startingPosition, inferSchemaclassification 연결 옵션이 필요합니다.

    Kafka 스트리밍 소스에는 connectionName, topicName, startingOffsets, inferSchemaclassification 연결 옵션이 필요합니다.

  • transformationContext - 작업 북마크에서 사용되는 싱크와 관련된 변환 컨텍스트입니다. 기본적으로 비우도록 설정합니다.

  • pushDownPredicate - 파티션 열에 대한 조건자입니다.

DataSource을 반환합니다.

Amazon Kinesis 스트리밍 소스의 예:

val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

Kafka 스트리밍 소스의 예:

val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

def getSourceWithFormat

def getSourceWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

Amazon S3, JDBC, AWS Glue Data Catalog 같은 소스에서 데이터를 읽는 DataSource 특성를 생성하고, 소스에 저장된 데이터 포맷도 설정합니다.

  • connectionType - 데이터 원본의 유형입니다. AWS Glue에서 ETL 관련 연결 유형 및 옵션 섹션을 참조하세요.

  • options - 데이터 원본을 연결하려는 추가 정보를 제공하는 JSON 이름-값 페어 문자열입니다. AWS Glue에서 ETL 관련 연결 유형 및 옵션 섹션을 참조하세요.

  • transformationContext - 작업 북마크에서 사용되는 싱크와 관련된 변환 컨텍스트입니다. 기본적으로 비우도록 설정합니다.

  • format - 소스에 저장된 데이터의 포맷입니다. connectionType이 "s3"일 경우에도 format을 지정할 수 있습니다. “avro”, “csv”, “grokLog”, “ion”, “json”, “xml”, “parquet”, “orc” 중 하나가 이에 해당합니다.

  • formatOptions - 소스로 데이터를 파싱하는 추가 옵션을 제공하는 JSON 이름-값 페어 문자열입니다. 데이터 포맷 옵션 섹션을 참조하세요.

DataSource을 반환합니다.

Amazon S3에서 CSV(쉼표로 구분된 값) 파일인 데이터 원본에서 DynamicFrame을 생성합니다.

val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "s3://csv/nycflights.csv"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()

JDBC 연결을 사용하여 PostgreSQL인 데이터 원본에서 DynamicFrame을 생성합니다.

val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://databasePostgres-1.rds.amazonaws.com:5432/testdb", "dbtable": "public.company", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

JDBC 연결을 사용하여 MySQL인 데이터 원본에서 DynamicFrame을 생성합니다.

val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://databaseMysql-1.rds.amazonaws.com:3306/testdb", "dbtable": "athenatest_nycflights13_csv", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

def getSparkSession

def getSparkSession : SparkSession

이 GlueContext와 관련된 SparkSession 객체를 얻습니다. 이 SparkSession 객체를 사용하여 DynamicFrame에서 생성된 DataFrame을 사용할 테이블 및 UDF를 등록합니다.

SparkSession을 반환합니다.

def startTransaction

def startTransaction(readOnly: Boolean):String

새 트랜잭션을 시작합니다. 내부적으로 Lake Formation startTransaction API를 호출합니다.

  • readOnly - (부울) 이 트랜잭션이 읽기 전용인지 또는 읽기/쓰기인지를 나타냅니다. 읽기 전용 트랜잭션 ID를 사용하여 수행된 쓰기는 거부됩니다. 읽기 전용 트랜잭션은 커밋할 필요가 없습니다.

트랜잭션 ID를 반환합니다.

def commitTransaction

def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean

지정된 트랜잭션을 커밋하려는 시도입니다. 트랜잭션이 커밋을 완료하기 전에 commitTransaction이 반환될 수 있습니다. 내부적으로 Lake Formation commitTransaction API를 호출합니다.

  • transactionId - (문자열) 커밋할 트랜잭션입니다.

  • waitForCommit - (부울) commitTransaction이 즉시 반환되는지 여부를 결정합니다. 기본값은 true입니다. false인 경우 commitTransaction은 폴링한 후 트랜잭션이 커밋될 때까지 기다립니다. 대기 시간은 최대 재시도 횟수가 6회인 지수 백오프를 사용하여 1분으로 제한됩니다.

커밋이 수행되었는지 여부를 나타내는 부울을 반환합니다.

def cancelTransaction

def cancelTransaction(transactionId: String): Unit

지정된 트랜잭션을 취소하려는 시도입니다. 내부적으로 Lake Formation CancelTransaction API를 호출합니다.

  • transactionId - (문자열) 취소할 트랜잭션입니다.

트랜잭션이 이전에 커밋된 경우 TransactionCommittedException 예외를 반환합니다.

def this

def this( sc : SparkContext, minPartitions : Int, targetPartitions : Int )

지정된 SparkContext, 최소 파티션 및 대상 파티션을 사용하여 GlueContext 객체를 생성합니다.

  • sc - SparkContext

  • minPartitions - 파티션의 최소 수입니다.

  • targetPartitions - 파티션의 목표 수입니다.

GlueContext을 반환합니다.

def this

def this( sc : SparkContext )

제공된 SparkContextGlueContext 객체를 생성합니다. 최소 파티션을 10으로 대상 파티션을 20으로 설정합니다.

  • sc - SparkContext

GlueContext을 반환합니다.

def this

def this( sparkContext : JavaSparkContext )

제공된 JavaSparkContextGlueContext 객체를 생성합니다. 최소 파티션을 10으로 대상 파티션을 20으로 설정합니다.

  • sparkContext - JavaSparkContext

GlueContext을 반환합니다.