1단계: 데이터 준비 - Amazon Kinesis Data Analytics for SQL 애플리케이션 개발자 안내서

새 프로젝트의 경우 SQL 애플리케이션용 Kinesis Data Analytics보다 Apache Flink Studio용 새로운 관리형 서비스를 사용하는 것이 좋습니다. Apache Flink Studio용 관리형 서비스는 사용 편의성과 고급 분석 기능을 결합하여 정교한 스트림 처리 애플리케이션을 몇 분 만에 구축할 수 있도록 합니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

1단계: 데이터 준비

이 섹션에서는 Kinesis 데이터 스트림을 생성한 다음 주문 및 거래 레코드를 스트림에 채워 넣습니다. 이는 다음 단계에서 생성할 애플리케이션의 스트리밍 소스입니다.

1.1단계: 스트리밍 소스 생성

콘솔 또는 AWS CLI을 사용하여 Kinesis 데이터 스트림을 생성할 수 있습니다. 이 예에서는 OrdersAndTradesStream을 스트림 명칭으로 가정합니다.

1.2단계: 스트리밍 소스 채우기

다음의 Python 스크립트를 실행하여 샘플 레코드를 OrdersAndTradesStream에 채웁니다. 다른 명칭의 스트림을 생성한 경우 Python 코드를 적절히 업데이트합니다.

  1. Python 및 pip를 설치합니다.

    Python 설치에 관한 정보는 Python 웹사이트를 참조하십시오.

    pip를 사용하여 종속 프로그램을 설치할 수 있습니다. pip 설치에 관한 정보는 pip 웹 사이트에 있는 Installation을 참조하십시오.

  2. 다음 Python 코드를 실행합니다. 코드에 있는 put-record 명령이 JSON 레코드를 스트림에 작성합니다.

    import json import random import boto3 STREAM_NAME = "OrdersAndTradesStream" PARTITION_KEY = "partition_key" def get_order(order_id, ticker): return { "RecordType": "Order", "Oid": order_id, "Oticker": ticker, "Oprice": random.randint(500, 10000), "Otype": "Sell", } def get_trade(order_id, trade_id, ticker): return { "RecordType": "Trade", "Tid": trade_id, "Toid": order_id, "Tticker": ticker, "Tprice": random.randint(0, 3000), } def generate(stream_name, kinesis_client): order_id = 1 while True: ticker = random.choice(["AAAA", "BBBB", "CCCC"]) order = get_order(order_id, ticker) print(order) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(order), PartitionKey=PARTITION_KEY ) for trade_id in range(1, random.randint(0, 6)): trade = get_trade(order_id, trade_id, ticker) print(trade) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(trade), PartitionKey=PARTITION_KEY, ) order_id += 1 if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

다음 단계

2단계: 애플리케이션 만들기