1단계: 입력 및 출력 스트림 생성 - Amazon Kinesis Data Analytics for SQL Applications 개발자 안내서

신중한 고려 끝에 두 단계로 Amazon Kinesis Data Analytics for SQL applications를 중단하기로 결정했습니다.

1. 2025년 10월 15일부터 SQL 애플리케이션을 위한 새 Kinesis Data Analytics를 생성할 수 없습니다.

2. 2026년 1월 27일부터 애플리케이션이 삭제됩니다. SQL 애플리케이션용 Amazon Kinesis Data Analytics를 시작하거나 작동할 수 없습니다. 해당 시점부터 에 대한 Amazon Kinesis Data AnalyticsSQL에 대한 지원을 더 이상 사용할 수 없습니다. 자세한 내용은 Amazon Kinesis Data Analytics for SQL Applications 중단 단원을 참조하십시오.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

1단계: 입력 및 출력 스트림 생성

핫스팟 예를 위해 Amazon Kinesis Data Analytics 애플리케이션을 생성하기 전에 먼저 Kinesis 데이터 스트림 2개를 생성합니다. 스트림 중 하나를 애플리케이션의 스트리밍 소스로 구성하고 또 다른 스트림을 Kinesis Data Analytics가 애플리케이션 출력을 유지하는 목적지로 구성합니다.

1.1단계: Kinesis 데이터 스트림 생성

이 섹션에서는 다음 2개의 Kinesis 데이터 스트림을 생성합니다: ExampleInputStreamExampleOutputStream.

콘솔 또는 AWS CLI을(를) 사용하여 데이터 스트림을 생성합니다.

  • 콘솔을 사용하여 데이터 스트림을 생성:

    1. AWS Management Console에 로그인하여 https://console.aws.amazon.com/kinesis에서 Kinesis 콘솔을 엽니다.

    2. 탐색 창에서 Data Streams(데이터 스트림)를 선택합니다.

    3. Kinesis 스트림 생성을 선택한 다음 샤드가 하나인 스트림(ExampleInputStream이라고 함)을 생성합니다.

    4. 이전 단계를 반복하여 샤드가 하나인 스트림(ExampleOutputStream이라고 함)을 생성합니다.

  • AWS CLI을(를) 사용하여 데이터 스트림 생성:

    • 다음의 Kinesis create-stream AWS CLI 명령을 사용하여 스트림(ExampleInputStreamExampleOutputStream)을 생성합니다. 애플리케이션이 출력을 작성하기 위해 사용할 두 번째 스트림을 생성하려면 동일한 명령을 실행하여 스트림 명칭을 ExampleOutputStream으로 변경합니다.

      $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

1.2단계: 샘플 레코드를 입력 스트림에 작성

이 단계에서는 Python 코드를 실행하여 샘플 레코드를 연속적으로 생성하고 ExampleInputStream 스트림에 작성합니다.

{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"} {"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
  1. Python 및 pip를 설치합니다.

    Python 설치에 관한 정보는 Python 웹사이트를 참조하십시오.

    pip를 사용하여 종속 프로그램을 설치할 수 있습니다. pip 설치에 관한 정보는 pip 웹 사이트에 있는 Installation을 참조하십시오.

  2. 다음 Python 코드를 실행합니다. 이 코드는 다음을 수행합니다.

    • (X, Y) 평면 어딘가에 잠재적 핫스팟을 생성합니다.

    • 각 핫스팟마다 1,000포인트 세트를 생성합니다. 이 포인트에서 20%가 핫스팟 주변에 클러스터링됩니다. 나머지는 전체 공간 내에 무작위로 생성됩니다.

    • put-record 명령은 JSON 레코드를 스트림에 작성합니다.

    중요

    이 파일에는 귀하의 AWS 자격 증명이 포함되어 있으므로 이 파일을 웹 서버에 업로드하지 마십시오.

    import json from pprint import pprint import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_hotspot(field, spot_size): hotspot = { "left": field["left"] + random.random() * (field["width"] - spot_size), "width": spot_size, "top": field["top"] + random.random() * (field["height"] - spot_size), "height": spot_size, } return hotspot def get_record(field, hotspot, hotspot_weight): rectangle = hotspot if random.random() < hotspot_weight else field point = { "x": rectangle["left"] + random.random() * rectangle["width"], "y": rectangle["top"] + random.random() * rectangle["height"], "is_hot": "Y" if rectangle is hotspot else "N", } return {"Data": json.dumps(point), "PartitionKey": "partition_key"} def generate( stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client ): """ Generates points used as input to a hotspot detection algorithm. With probability hotspot_weight (20%), a point is drawn from the hotspot; otherwise, it is drawn from the base field. The location of the hotspot changes for every 1000 points generated. """ points_generated = 0 hotspot = None while True: if points_generated % 1000 == 0: hotspot = get_hotspot(field, hotspot_size) records = [ get_record(field, hotspot, hotspot_weight) for _ in range(batch_size) ] points_generated += len(records) pprint(records) kinesis_client.put_records(StreamName=stream_name, Records=records) time.sleep(0.1) if __name__ == "__main__": generate( stream_name=STREAM_NAME, field={"left": 0, "width": 10, "top": 0, "height": 10}, hotspot_size=1, hotspot_weight=0.2, batch_size=10, kinesis_client=boto3.client("kinesis"), )

다음 단계

2단계: Kinesis Data Analytics 애플리케이션 생성