AWS Glue스칼라 API GlueContext - AWS Glue

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

AWS Glue스칼라 API GlueContext

패키지:   com.amazonaws.services.glue

class GlueContext extends SQLContext(sc) ( @transient val sc : SparkContext, val defaultSourcePartitioner : PartitioningStrategy )

GlueContext는 Amazon Simple Storage Service(Amazon S3), AWS Glue Data Catalog, JDBC 등에서 DynamicFrame을 읽고 쓰기 위한 진입점입니다. 이 클래스는 DataSource 특성DataSink 객체를 생성하는 유틸리티 함수를 제공하며, 이 객체는 DynamicFrame을 읽고 쓸 때 사용할 수 있습니다.

GlueContext를 사용하여 DynamicFrame에 목표한 수의 파티션(기본값 20)을 설정할 수도 있습니다. 단, 소스에서 생성된 파티션 수가 파티션 최소 임계값(기본값 10) 미만이어야 합니다.

데프 컬럼 addIngestionTime

def addIngestionTimeColumns( df : DataFrame, timeGranularity : String = "") : dataFrame

입력 DataFrameingest_year, ingest_month, ingest_day, ingest_hour, ingest_minute와 같은 수집 시간 열을 추가합니다. 이 함수는 Amazon S3을 대상으로 하는 데이터 카탈로그 테이블을 지정할 때 AWS Glue가 생성하는 스크립트에서 자동으로 생성됩니다. 이 함수는 출력 테이블의 수집 시간 열로 파티션을 자동으로 업데이트합니다. 이를 통해 입력 데이터에 명시적인 수집 시간 열을 요구하지 않고도 출력 데이터를 수집 시간에 자동으로 분할할 수 있습니다.

  • dataFrame - 수집 시간 열을 추가할 dataFrame입니다.

  • timeGranularity - 시간 열의 세분성입니다. 유효 값은 "day", "hour" 및 "minute"입니다. 예를 들어 "hour"가 함수에 전달되면 원래 dataFrame에 "ingest_year", "ingest_month", "ingest_day" 및 "ingest_hour" 시간 열이 추가됩니다.

시간 세분성 열을 추가한 후 데이터 프레임을 반환합니다.

예제

glueContext.addIngestionTimeColumns(dataFrame, "hour")

데프 createDataFrame FromOptions

def createDataFrameFromOptions( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

지정된 연결 및 포맷으로 생성된 DataFrame을 반환합니다. 이 함수는 AWS Glue 스트리밍 소스에서만 사용하십시오.

  • connectionType - 스트리밍 연결 유형입니다. 유효한 값에는 kinesiskafka(이)가 있습니다.

  • connectionOptions - 연결 옵션이며, Kinesis 및 Kafka에 대해 서로 다릅니다. AWS Glue for Spark에서 ETL에 대한 연결 유형 및 옵션에서 각 스트리밍 데이터 원본에 대한 모든 연결 옵션 목록을 찾을 수 있습니다. 스트리밍 연결 옵션에는 다음과 같은 차이점이 있습니다.

    • Kinesis 스트리밍 소스에는 streamARN, startingPosition, inferSchemaclassification이 필요합니다.

    • Kafka 스트리밍 소스에는 connectionName, topicName, startingOffsets, inferSchemaclassification이 필요합니다.

  • transformationContext - 사용할 변환 컨텍스트입니다(선택 사항).

  • format - 형식 사양입니다(선택 사항). 여러 포맷을 지원하는 Amazon S3 또는 AWS Glue 연결에 사용됩니다. 지원되는 형식에 관한 내용은 AWS Glue for Spark에서 입력 및 출력의 데이터 형식 옵션 섹션을 참조하세요.

  • formatOptions - 지정된 형식에 대한 형식 옵션입니다. 지원되는 포맷 옵션에 대한 자세한 내용은 데이터 포맷 옵션 섹션을 참조하세요.

Amazon Kinesis 스트리밍 소스의 예:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))

Kafka 스트리밍 소스의 예:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))

forEachBatch

forEachBatch(frame, batch_function, options)

스트리밍 소스에서 읽는 모든 마이크로 배치에 전달된 batch_function을 적용합니다.

  • frame— 현재 마이크로 배치를 DataFrame 포함하고 있습니다.

  • batch_function - 모든 마이크로 배치에 적용될 함수입니다.

  • options - 마이크로 배치를 처리하는 방법에 대한 정보가 들어 있는 키-값 페어 컬렉션입니다. 다음 옵션이 필요합니다.

    • windowSize - 각 배치를 처리하는 데 소요되는 시간입니다.

    • checkpointLocation - 스트리밍 ETL 작업에 대해 체크포인트가 저장되는 위치입니다.

    • batchMaxRetries – 실패한 경우 배치를 다시 시도할 수 있는 최대 횟수입니다. 기본값은 3입니다. 이 옵션은 Glue 버전 2.0 이상에서만 구성할 수 있습니다.

예:

glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))

def getCatalogSink

def getCatalogSink( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSink

Data Catalog에 정의된 테이블에 지정된 위치에 쓰는 DataSink를 생성합니다.

  • database - Data Catalog에 있는 데이터베이스 이름입니다.

  • tableName - Data Catalog에 있는 테이블 이름입니다.

  • redshiftTmpDir - 특정 데이터 싱크와 사용되는 임시 준비 디렉터리입니다. 기본적으로 비우도록 설정합니다.

  • transformationContext - 작업 북마크에서 사용되는 싱크와 관련된 변환 컨텍스트입니다. 기본적으로 비우도록 설정합니다.

  • additionalOptions - AWS Glue에 제공되는 추가 옵션입니다.

  • catalogId - 액세스 중인 Data Catalog의 카탈로그 ID(계정 ID)입니다. null인 경우 호출자의 기본 계정 ID가 사용됩니다.

DataSink을 반환합니다.

데프 getCatalogSource

def getCatalogSource( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" pushDownPredicate : String = " " additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSource

Data Catalog의 테이블 정의에서 데이터를 읽는 DataSource 특성를 생성합니다.

  • database - Data Catalog에 있는 데이터베이스 이름입니다.

  • tableName - Data Catalog에 있는 테이블 이름입니다.

  • redshiftTmpDir - 특정 데이터 싱크와 사용되는 임시 준비 디렉터리입니다. 기본적으로 비우도록 설정합니다.

  • transformationContext - 작업 북마크에서 사용되는 싱크와 관련된 변환 컨텍스트입니다. 기본적으로 비우도록 설정합니다.

  • pushDownPredicate - 데이터 집합 내 모든 파일을 나열하거나 읽지 않아도 파티션에 필터링할 수 있습니다. 자세한 정보는 푸시다운 조건자를 사용하여 예비 필터링을 참조하세요.

  • additionalOptions - 선택적 이름-값 페어의 모음입니다. 가능한 옵션에는 AWS Glue for Spark에서 ETL에 대한 연결 유형 및 옵션에 나열된 옵션이 포함됩니다(endpointUrl, streamName, bootstrap.servers, security.protocol, topicName, classificationdelimiter 제외). 지원되는 또 다른 옵션은 catalogPartitionPredicate입니다.

    catalogPartitionPredicate - 카탈로그 표현식을 전달하여 인덱스 열을 기준으로 필터링할 수 있습니다. 이렇게 하면 필터링이 서버 측으로 푸시됩니다. 자세한 내용은 AWS Glue 파티션 인덱스를 참조하세요. push_down_predicatecatalogPartitionPredicate는 다른 구문을 사용합니다. 전자는 Spark SQL 표준 구문을 사용하고 후자는 JSQL 구문 분석기를 사용합니다.

  • catalogId - 액세스 중인 Data Catalog의 카탈로그 ID(계정 ID)입니다. null인 경우 호출자의 기본 계정 ID가 사용됩니다.

DataSource을 반환합니다.

스트리밍 소스의 예

val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()

def getJDBCSink

def getJDBCSink( catalogConnection : String, options : JsonOptions, redshiftTmpDir : String = "", transformationContext : String = "", catalogId: String = null ) : DataSink

Data Catalog의 Connection 객체에 지정된 JDBC 데이터베이스에 쓰는 DataSink를 생성합니다. Connection 객체는 URL, 사용자 이름, 암호, VPC, 서브넷 및 보안 그룹을 포함한 JDBC 싱크로 연결하는 정보를 가지고 있습니다.

  • catalogConnection - 작성할 JDBC URL을 포함하는 Data Catalog의 연결 이름입니다.

  • options - JDBC 데이터 스토어로 작성할 때 필요한 추가 정보를 제공하는 JSON 이름-값 페어의 문자열입니다. 여기에는 다음이 포함됩니다.

    • dbtable(필수) - JDBC 테이블의 이름입니다. 데이터베이스 내의 스키마를 지원하는 JDBC 데이터 스토어의 경우 schema.table-name에 대해 지정합니다. 스키마가 제공되지 않으면 기본 "퍼블릭" 스키마가 사용됩니다. 다음 예제에서는 test라는 스키마와 test_db 데이터베이스의 test_table 테이블을 가리키는 옵션 파라미터를 보여 줍니다.

      options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
    • database(필수) - JDBC 데이터베이스의 이름입니다.

    • SparkSQL JDBC 라이터에게 직접 전달되는 추가 옵션. 자세한 내용은 Redshift data source for Spark를 참조하십시오.

  • redshiftTmpDir - 특정 데이터 싱크와 사용되는 임시 준비 디렉터리입니다. 기본적으로 비우도록 설정합니다.

  • transformationContext - 작업 북마크에서 사용되는 싱크와 관련된 변환 컨텍스트입니다. 기본적으로 비우도록 설정합니다.

  • catalogId - 액세스 중인 Data Catalog의 카탈로그 ID(계정 ID)입니다. null인 경우 호출자의 기본 계정 ID가 사용됩니다.

예제 코드:

getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")

DataSink을 반환합니다.

def getSink

def getSink( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" ) : DataSink

Amazon Simple Storage Service (Amazon S3), JDBC, AWS Glue 데이터 카탈로그, 아파치 카프카 또는 Amazon Kinesis 데이터 스트림과 같은 대상에 데이터를 쓰는 데이터를 생성합니다. DataSink

DataSink을 반환합니다.

getSinkWithdef 포맷

def getSinkWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSink

Amazon S3, JDBC, 데이터 카탈로그나 Apache Kafka 또는 Amazon Kinesis 데이터 스트림과 같은 대상에 데이터를 쓰는 DataSink를 생성합니다. 또한 대상에 기록할 데이터의 형식을 설정합니다.

  • connectionType - 연결 유형입니다. AWS Glue for Spark에서 ETL에 대한 연결 유형 및 옵션 섹션을 참조하십시오.

  • options - 데이터 싱크 연결하려는 추가 정보를 제공하는 JSON 이름-값 페어 문자열입니다. AWS Glue for Spark에서 ETL에 대한 연결 유형 및 옵션 섹션을 참조하십시오.

  • transformationContext - 작업 북마크에서 사용되는 싱크와 관련된 변환 컨텍스트입니다. 기본적으로 비우도록 설정합니다.

  • format - 대상에 작성되는 데이터의 포맷입니다.

  • formatOptions - 대상 주소로 데이터를 구성하는 추가 옵션을 제공하는 JSON 이름-값 페어 문자열입니다. 데이터 포맷 옵션 섹션을 참조하십시오.

DataSink을 반환합니다.

def getSource

def getSource( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" pushDownPredicate ) : DataSource

Amazon S3, JDBC 또는 AWS Glue 데이터 카탈로그와 같은 소스에서 데이터를 읽는 데이터를 생성합니다. DataSource 특성 Kafka 및 Kinesis 스트리밍 데이터 원본도 지원합니다.

  • connectionType - 데이터 원본의 유형입니다. AWS Glue for Spark에서 ETL에 대한 연결 유형 및 옵션 섹션을 참조하십시오.

  • connectionOptions - 데이터 원본을 연결하려는 추가 정보를 제공하는 JSON 이름-값 페어 문자열입니다. 자세한 정보는 AWS Glue for Spark에서 ETL에 대한 연결 유형 및 옵션을 참조하세요.

    Kinesis 스트리밍 소스에는 streamARN, startingPosition, inferSchemaclassification 연결 옵션이 필요합니다.

    Kafka 스트리밍 소스에는 connectionName, topicName, startingOffsets, inferSchemaclassification 연결 옵션이 필요합니다.

  • transformationContext - 작업 북마크에서 사용되는 싱크와 관련된 변환 컨텍스트입니다. 기본적으로 비우도록 설정합니다.

  • pushDownPredicate - 파티션 열에 대한 조건자입니다.

DataSource을 반환합니다.

Amazon Kinesis 스트리밍 소스의 예:

val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

Kafka 스트리밍 소스의 예:

val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

def 포맷 getSourceWith

def getSourceWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

Amazon S3, JDBC 또는 AWS Glue Data Catalog와 같은 원본에서 데이터를 읽고 원본에 저장된 데이터의 형식도 설정하는 코드를 생성합니다. DataSource 특성

  • connectionType - 데이터 원본의 유형입니다. AWS Glue for Spark에서 ETL에 대한 연결 유형 및 옵션 섹션을 참조하십시오.

  • options - 데이터 원본을 연결하려는 추가 정보를 제공하는 JSON 이름-값 페어 문자열입니다. AWS Glue for Spark에서 ETL에 대한 연결 유형 및 옵션 섹션을 참조하십시오.

  • transformationContext - 작업 북마크에서 사용되는 싱크와 관련된 변환 컨텍스트입니다. 기본적으로 비우도록 설정합니다.

  • format - 소스에 저장된 데이터의 포맷입니다. connectionType이 "s3"일 경우에도 format을 지정할 수 있습니다. “avro”, “csv”, “grokLog”, “ion”, “json”, “xml”, “parquet”, “orc” 중 하나가 이에 해당합니다.

  • formatOptions - 소스로 데이터를 파싱하는 추가 옵션을 제공하는 JSON 이름-값 페어 문자열입니다. 데이터 포맷 옵션 섹션을 참조하십시오.

DataSource을 반환합니다.

예제

Amazon DynamicFrame S3의 쉼표로 구분된 값 (CSV) 파일인 데이터 소스에서 다음을 생성합니다.

val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "s3://csv/nycflights.csv"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()

DynamicFrame JDBC 연결을 사용하여 PostgreSQL인 데이터 소스에서 다음을 생성합니다.

val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://databasePostgres-1.rds.amazonaws.com:5432/testdb", "dbtable": "public.company", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

DynamicFrame JDBC 연결을 사용하여 MySQL인 데이터 소스에서 를 생성합니다.

val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://databaseMysql-1.rds.amazonaws.com:3306/testdb", "dbtable": "athenatest_nycflights13_csv", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

def getSparkSession

def getSparkSession : SparkSession

이 GlueContext와 관련된 SparkSession 객체를 얻습니다. 이 SparkSession 객체를 사용하여 DataFrame 생성된 원본에 사용할 테이블과 UDF를 등록할 수 있습니다. DynamicFrames

를 반환합니다. SparkSession

def startTransaction

def startTransaction(readOnly: Boolean):String

새 트랜잭션을 시작합니다. 내부적으로 Lake Formation startTransaction API를 호출합니다.

  • readOnly - (부울) 이 트랜잭션이 읽기 전용인지 또는 읽기/쓰기인지를 나타냅니다. 읽기 전용 트랜잭션 ID를 사용하여 수행된 쓰기는 거부됩니다. 읽기 전용 트랜잭션은 커밋할 필요가 없습니다.

트랜잭션 ID를 반환합니다.

def commitTransaction

def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean

지정된 트랜잭션을 커밋하려는 시도입니다. 트랜잭션이 커밋을 완료하기 전에 commitTransaction이 반환될 수 있습니다. 내부적으로 Lake Formation commitTransaction API를 호출합니다.

  • transactionId - (문자열) 커밋할 트랜잭션입니다.

  • waitForCommit - (부울) commitTransaction이 즉시 반환되는지 여부를 결정합니다. 기본값은 true입니다. false인 경우 commitTransaction은 폴링한 후 트랜잭션이 커밋될 때까지 기다립니다. 대기 시간은 최대 재시도 횟수가 6회인 지수 백오프를 사용하여 1분으로 제한됩니다.

커밋이 수행되었는지 여부를 나타내는 부울을 반환합니다.

def cancelTransaction

def cancelTransaction(transactionId: String): Unit

지정된 트랜잭션을 취소하려는 시도입니다. 내부적으로 레이크 포메이션 CancelTransactionAPI를 호출합니다.

  • transactionId - (문자열) 취소할 트랜잭션입니다.

트랜잭션이 이전에 커밋된 경우 TransactionCommittedException 예외를 반환합니다.

def this

def this( sc : SparkContext, minPartitions : Int, targetPartitions : Int )

지정된 SparkContext, 최소 파티션 및 대상 파티션을 사용하여 GlueContext 객체를 생성합니다.

  • sc - SparkContext

  • minPartitions - 파티션의 최소 수입니다.

  • targetPartitions - 파티션의 목표 수입니다.

GlueContext을 반환합니다.

def this

def this( sc : SparkContext )

제공된 SparkContextGlueContext 객체를 생성합니다. 최소 파티션을 10으로 대상 파티션을 20으로 설정합니다.

  • sc - SparkContext

GlueContext을 반환합니다.

def this

def this( sparkContext : JavaSparkContext )

제공된 JavaSparkContextGlueContext 객체를 생성합니다. 최소 파티션을 10으로 대상 파티션을 20으로 설정합니다.

  • sparkContext - JavaSparkContext

GlueContext을 반환합니다.