예: 값 변환 DateTime - SQL애플리케이션용 Amazon Kinesis Data Analytics 개발자 가이드

새 프로젝트의 경우 애플리케이션용 Kinesis Data Analytics보다 Apache Flink Studio용 새로운 관리형 서비스를 사용하는 것이 좋습니다. SQL Managed Service for Apache Flink Studio는 사용 편의성과 고급 분석 기능을 결합하여 정교한 스트림 처리 애플리케이션을 몇 분 만에 구축할 수 있도록 합니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

예: 값 변환 DateTime

Amazon Kinesis Data Analytics는 열을 타임스탬프로 변환하는 것을 지원합니다. 예를 들어 GROUP BY 절의 일부인 자체 타임스탬프를 ROWTIME 열에 더하여 또 다른 시간 기반 윈도우로 사용할 수 있습니다. Kinesis Data Analytics는 날짜 및 시간 필드 작업을 위한 작업 및 SQL 함수를 제공합니다.

  • 날짜 및 시간 연산자 – 날짜, 시간 및 간격 데이터 유형에 대한 산술 연산을 수행할 수 있습니다. 자세한 설명은 Amazon Managed Service for Apache Flink SQL 참조날짜, 타임스탬프 및 간격 연산자를 참조하십시오.

     

  • SQL 함수 – 여기에는 다음이 포함됩니다. 자세한 설명은 Amazon Managed Service for Apache Flink SQL 참조날짜 및 시간 함수를 참조하십시오.

    • EXTRACT() – 날짜, 시간, 타임스탬프 또는 간격 표현식에서 필드 하나를 추출합니다.

    • CURRENT_TIME – 쿼리가 실행될 때의 시간을 반환합니다(UTC).

    • CURRENT_DATE – 쿼리가 실행될 때의 날짜를 반환합니다(UTC).

    • CURRENT_TIMESTAMP – 쿼리가 실행될 때의 타임스탬프를 반환합니다(UTC).

    • LOCALTIME – Kinesis Data Analytics가 실행 중인 환경에서 정의된 대로 쿼리가 실행될 때의 현재 시간(UTC)을 반환합니다.

    • LOCALTIMESTAMP – Kinesis Data Analytics가 실행 중인 환경에 의해 정의되는 대로 현재 타임스탬프를 반환합니다(UTC).

       

  • SQL 확장 – 여기에는 다음이 포함됩니다. 자세한 설명은 Amazon Managed Service for Apache Flink SQL 참조에서 날짜 및 시간 함수날짜 시간 변환 함수를 참조하십시오.

    • CURRENT_ROW_TIMESTAMP – 스트림 상의 각 행에 대해 새 타임스탬프를 반환합니다.

    • TSDIFF – 두 타임스탬프 간의 차이를 밀리초 단위로 반환합니다.

    • CHAR_TO_DATE – 문자열을 날짜로 변환합니다.

    • CHAR_TO_TIME – 문자열을 시간으로 변환합니다.

    • CHAR_TO_TIMESTAMP – 문자열을 타임스탬프로 변환합니다.

    • DATE_TO_CHAR – 날짜를 문자열로 변환합니다.

    • TIME_TO_CHAR – 시간을 문자열로 변환합니다.

    • TIMESTAMP_TO_CHAR – 타임스탬프를 문자열로 변환합니다.

위의 SQL 함수 중 대부분은 형식을 사용하여 열을 변환합니다. 형식은 유연합니다. 예를 들어, 형식 yyyy-MM-dd hh:mm:ss를 지정하여 입력 문자열 2009-09-16 03:15:24를 타임스탬프로 변환할 수 있습니다. 자세한 설명은 Amazon Managed Service for Apache Flink SQL 참조Char To Timestamp(Sys)를 참조하십시오.

예: 날짜 변환

이 예에서는 다음 레코드를 Amazon Kinesis 데이터 스트림에 기록합니다.

{"EVENT_TIME": "2018-05-09T12:50:41.337510", "TICKER": "AAPL"} {"EVENT_TIME": "2018-05-09T12:50:41.427227", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.520549", "TICKER": "INTC"} {"EVENT_TIME": "2018-05-09T12:50:41.610145", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.704395", "TICKER": "AAPL"} ...

그런 다음 콘솔에서 Kinesis Data Analytics 애플리케이션을 생성하고 Kinesis 스트림을 스트리밍 소스로 취합니다. 검색 프로세스는 스트리밍 소스 상의 샘플 레코드를 읽고 다음과 같이 두 개의 열(EVENT_TIMETICKER)로 애플리케이션 내 스키마를 유추합니다.

이벤트 시간 및 티커 열이 포함된 애플리케이션 내 스키마를 보여주는 콘솔 스크린샷

그런 다음 SQL 함수를 지닌 애플리케이션 코드를 사용하여 EVENT_TIME 타임스탬프 필드를 다양한 방법으로 변환합니다. 그러면 다음 스크린샷과 같이 다른 애플리케이션 내 스트림에 결과 데이터를 삽입합니다.

애플리케이션 내 스트림의 결과 데이터를 보여주는 콘솔 스크린샷

1단계: Kinesis 데이터 스트림 생성

Amazon Kinesis 데이터 스트림을 생성하고 다음과 같이 이벤트 시간 및 티커 레코드를 채웁니다:

  1. 에 AWS Management Console 로그인하고 https://console.aws.amazon.com/kinesis 에서 Kinesis 콘솔을 엽니다.

  2. 탐색 창에서 Data Streams(데이터 스트림)를 선택합니다.

  3. Kinesis 스트림 생성을 선택한 다음 샤드가 하나인 스트림을 생성합니다.

  4. 다음의 Python 코드를 실행하여 스트림을 샘플 데이터로 채웁니다. 이 간단한 코드는 지속적으로 임의 티커 기호 및 현재 타임스탬프가 포함된 레코드를 스트림에 씁니다.

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

2단계: Amazon Kinesis Data Analytics 애플리케이션 생성

다음과 같이 애플리케이션을 생성합니다:

  1. https://console.aws.amazon.com/kinesisanalytics에서 Managed Service for Apache Flink 콘솔을 엽니다.

  2. 애플리케이션 생성을 선택하고 애플리케이션 명칭을 입력한 다음 애플리케이션 생성을 선택합니다.

  3. 애플리케이션 세부 정보 페이지에서 Connect streaming data(스트리밍 데이터 연결)를 선택하여 소스에 연결합니다.

  4. Connect to source(소스에 연결) 페이지에서 다음을 수행합니다.

    1. 이전 섹션에서 생성한 스트림을 선택합니다.

    2. IAM 역할 생성을 선택합니다.

    3. Discover schema(스키마 발견)를 선택합니다. 유추된 스키마와, 생성된 애플리케이션 내 스트림에 대한 스키마를 유추하는 데 사용된 샘플 레코드를 콘솔이 표시할 때까지 기다립니다. 유추된 스키마에는 두 개의 열이 있습니다.

    4. Edit Schema(스키마 편집)를 선택합니다. EVENT_TIME 열의 열 유형TIMESTAMP으로 변경합니다.

    5. [Save schema and update stream samples]를 선택합니다. 콘솔에서 스키마를 저장한 이후 종료를 선택합니다.

    6. [Save and continue]를 선택합니다.

  5. 애플리케이션 세부 정보 페이지에서 Go to SQL editor(SQL 편집기로 이동)를 선택합니다. 애플리케이션을 시작하려면 나타나는 대화 상자에서 Yes, start application(예, 애플리케이션 시작)을 선택합니다.

  6. SQL 편집기에서 애플리케이션 코드를 작성하고 다음과 같이 결과를 확인합니다.

    1. 다음 애플리케이션 코드를 복사하여 편집기에 붙여넣습니다.

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TICKER VARCHAR(4), event_time TIMESTAMP, five_minutes_before TIMESTAMP, event_unix_timestamp BIGINT, event_timestamp_as_char VARCHAR(50), event_second INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE, UNIX_TIMESTAMP(EVENT_TIME), TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME), EXTRACT(SECOND FROM EVENT_TIME) FROM "SOURCE_SQL_STREAM_001"
    2. [Save and run SQL]을 선택합니다. Real-time analytics(실시간 분석) 탭에서 애플리케이션이 생성한 모든 애플리케이션 내 스트림을 확인하고 데이터를 검증할 수 있습니다.