1단계: 준비 - Amazon Kinesis Data Analytics for SQL 애플리케이션 개발자 안내서

새 프로젝트의 경우 SQL 애플리케이션용 Kinesis Data Analytics보다 Apache Flink Studio용 새로운 관리형 서비스를 사용하는 것이 좋습니다. Apache Flink Studio용 관리형 서비스는 사용 편의성과 고급 분석 기능을 결합하여 정교한 스트림 처리 애플리케이션을 몇 분 만에 구축할 수 있도록 합니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

1단계: 준비

이 연습에 사용할 Amazon Kinesis Data Analytics 애플리케이션을 생성하기 전에 Kinesis 데이터 스트림 두 개를 생성해야 합니다. 스트림 중 하나를 애플리케이션의 스트리밍 소스로 구성하고 또 다른 스트림을 Kinesis Data Analytics가 애플리케이션 출력을 유지하는 목적지로 구성합니다.

1.1단계: 입력 및 출력 데이터 스트림 생성

이 섹션에서는 2개의 Kinesis 스트림을 생성합니다: ExampleInputStreamExampleOutputStream. AWS Management Console 또는 AWS CLI을(를) 사용하여 이러한 스트림을 만들 수 있습니다.

  • 콘솔을 사용하려면
    1. AWS Management Console에 로그인하여 https://console.aws.amazon.com/kinesis에서 Kinesis 콘솔을 엽니다.

    2. 데이터 스트리밍 생성을 선택합니다. 샤드가 하나인 스트림(ExampleInputStream이라고 함)을 생성합니다. 자세한 설명은 Amazon Kinesis Data Streams 개발자 가이드스트림 생성을 참조하세요.

    3. 이전 단계를 반복하여 샤드가 하나인 스트림(ExampleOutputStream이라고 함)을 생성합니다.

  • AWS CLI를 사용하려면,
    1. 다음의 Kinesis create-streamAWS CLI 명령을 사용하여 첫 번째 스트림(ExampleInputStream)을 생성합니다.

      $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \ --profile adminuser
    2. 동일한 명령을 실행하여 스트림 명칭을 ExampleOutputStream으로 변경합니다. 이 명령은 두 번째 스트림을 생성하고 애플리케이션은 이를 사용하여 출력을 작성합니다.

1.2단계: 샘플 레코드를 입력 스트림에 작성

이 단계에서는 Python 코드를 실행하여 샘플 레코드를 연속적으로 생성하고 이러한 레코드를 ExampleInputStream 스트림에 작성합니다.

{"heartRate": 60, "rateType":"NORMAL"} ... {"heartRate": 180, "rateType":"HIGH"}
  1. Python 및 pip를 설치합니다.

    Python 설치에 관한 정보는 Python 웹사이트를 참조하십시오.

    pip를 사용하여 종속 프로그램을 설치할 수 있습니다. pip 설치에 관한 정보는 pip 웹 사이트에 있는 Installation을 참조하십시오.

  2. 다음 Python 코드를 실행합니다. 코드에 있는 put-record 명령이 JSON 레코드를 스트림에 작성합니다.

    from enum import Enum import json import random import boto3 STREAM_NAME = "ExampleInputStream" class RateType(Enum): normal = "NORMAL" high = "HIGH" def get_heart_rate(rate_type): if rate_type == RateType.normal: rate = random.randint(60, 100) elif rate_type == RateType.high: rate = random.randint(150, 200) else: raise TypeError return {"heartRate": rate, "rateType": rate_type.value} def generate(stream_name, kinesis_client, output=True): while True: rnd = random.random() rate_type = RateType.high if rnd < 0.01 else RateType.normal heart_rate = get_heart_rate(rate_type) if output: print(heart_rate) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(heart_rate), PartitionKey="partitionkey", ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

다음 단계

단계 2: 애플리케이션 만들기