步驟 1:準備資料 - 適用於 SQL 應用程式的 Amazon Kinesis Data Analytics 開發人員指南

針對新專案,我們建議您優先選擇新的 Managed Service for Apache Flink Studio,而非 Kinesis Data Analytics for SQL 應用程式。Managed Service for Apache Flink Studio 易於使用且具備進階分析功能,可讓您在幾分鐘內建置複雜的串流處理應用程式。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

步驟 1:準備資料

在本節中,建立 Kinesis 資料串流,然後在串流中填入訂單和交易記錄。這是您在下一個步驟中建立之應用程式的串流來源。

步驟 1.1:建立串流來源

您可以使用主控台或 AWS CLI建立 Kinesis 資料串流。此範例假設 OrdersAndTradesStream 為串流名稱。

  • 使用主控台 — 登入 AWS Management Console 並開啟 Kinesis 主控台,網址為 https://console.aws.amazon.com/kinesis。選擇資料串流,然後建立具有一個碎片的串流。如需詳細資訊,請參閱 Amazon Kinesis Data Streams 開發人員指南中的建立串流

  • 使用 AWS CLI — 使用下列 Kinesis create-stream AWS CLI 指令建立串流:

    $ aws kinesis create-stream \ --stream-name OrdersAndTradesStream \ --shard-count 1 \ --region us-east-1 \ --profile adminuser

步驟 1.2:填入串流來源

執行下列 Python 指令碼,將範例記錄填入 OrdersAndTradesStream。如果您使用不同的名稱建立串流,請更新相應的 Python 程式碼。

  1. 安裝 Python 與 pip

    如需安裝 Python 的相關資訊,請參閱 Python 網站。

    您可以使用 Pip 安裝相依性。如需安裝 Pip 的詳細資訊,請參閱 pip 網站的安裝

  2. 執行以下 Python 程式碼。程式碼中的 put-record 命令會將 JSON 記錄寫入串流。

    import json import random import boto3 STREAM_NAME = "OrdersAndTradesStream" PARTITION_KEY = "partition_key" def get_order(order_id, ticker): return { "RecordType": "Order", "Oid": order_id, "Oticker": ticker, "Oprice": random.randint(500, 10000), "Otype": "Sell", } def get_trade(order_id, trade_id, ticker): return { "RecordType": "Trade", "Tid": trade_id, "Toid": order_id, "Tticker": ticker, "Tprice": random.randint(0, 3000), } def generate(stream_name, kinesis_client): order_id = 1 while True: ticker = random.choice(["AAAA", "BBBB", "CCCC"]) order = get_order(order_id, ticker) print(order) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(order), PartitionKey=PARTITION_KEY ) for trade_id in range(1, random.randint(0, 6)): trade = get_trade(order_id, trade_id, ticker) print(trade) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(trade), PartitionKey=PARTITION_KEY, ) order_id += 1 if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

後續步驟

步驟 2:建立應用程式