新規プロジェクトでは、Kinesis Data Analytics for SQL よりも 新しい Managed Service for Apache Flink Studio を使用することをお勧めします。Managed Service for Apache Flink Studio は、使いやすさと高度な分析機能を兼ね備えているため、高度なストリーム処理アプリケーションを数分で構築できます。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
例: 文字列の一部の抽出 (SUBSTRING 関数)
この例では、SUBSTRING
関数を使用して Amazon Kinesis Data Analytics で文字列を変換します。SUBSTRING
関数は、特定の部分から始まるソース文字列の一部を抽出します。詳細については、「Amazon Managed Service for Apache Flink SQL リファレンス」の「SUBSTRING」を参照してください。
この例では、次のレコードを Amazon Kinesis データストリームに書き込みます。
{ "REFERRER" : "http://www.amazon.com" } { "REFERRER" : "http://www.amazon.com"} { "REFERRER" : "http://www.amazon.com"} ...
次に、Kinesis データストリームをストリーミングソースとして使用して、コンソールで Kinesis Data Analytics アプリケーションを作成します。検出プロセスでストリーミングソースのサンプルレコードが読み込まれ、次のように、アプリケーション内スキーマの列が 1 つ (REFERRER
) であると推察します。
次に、SUBSTRING
関数を持つアプリケーションコードを使用して、URL 文字列を解析して会社名を取得します。その後、次に示すように生成されたデータを別のアプリケーション内ストリームに挿入します。
ステップ 1: Kinesis データストリームを作成する
次のように、Amazon Kinesis データストリームを作成して、ログレコードを追加します。
AWS Management Console にサインインし、Kinesis コンソール (https://console.aws.amazon.com/kinesis
) を開きます。 -
ナビゲーションペインで、[データストリーム] を選択します。
-
[Kinesis ストリームの作成] を選択し、1 つのシャードがあるストリームを作成します。詳細については、「Amazon Kinesis Data Streams デベロッパーガイド」の「ストリームを作成する」を参照してください。
-
サンプルログレコードを入力するには、以下の Python コードを実行します。このシンプルなコードは、同じログレコードを連続してストリームに書き込みます。
import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return {"REFERRER": "http://www.amazon.com"} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
ステップ 2: Kinesis Data Analytics アプリケーションを作成する
続いて、次のように Kinesis Data Analytics アプリケーションを作成します。
https://console.aws.amazon.com/kinesisanalytics
にある Managed Service for Apache Flink コンソールを開きます。 -
[アプリケーションの作成] を選択し、アプリケーション名を入力して、[アプリケーションの作成] を選択します。
-
アプリケーション詳細ページで、[Connect streaming data (ストリーミングデータの接続)] を選択します。
-
[Connect to source (ソースに接続)] ページで、以下の操作を実行します。
-
前のセクションで作成したストリームを選択します。
-
IAM ロールを作成するオプションを選択します。
-
[Discover schema] (スキーマの検出) を選択します。作成されたアプリケーション内ストリーム用の推測スキーマと、推測に使用されたサンプルレコードがコンソールに表示されるまで待ちます。推測スキーマの列は 1 つのみです。
-
[Save and continue] を選択します。
-
-
アプリケーション詳細ページで、[Go to SQL editor (SQL エディタに移動)] を選択します。アプリケーションを起動するには、表示されたダイアログボックスで [Yes, start application (はい、アプリケーションを起動します)] を選択します。
-
SQL エディタで、次のように、アプリケーションコードを作成してその結果を確認します。
-
次のアプリケーションコードをコピーしてエディタに貼り付けます。
-- CREATE OR REPLACE STREAM for cleaned up referrer CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( "ingest_time" TIMESTAMP, "referrer" VARCHAR(32)); CREATE OR REPLACE PUMP "myPUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM "APPROXIMATE_ARRIVAL_TIME", SUBSTRING("referrer", 12, (POSITION('.com' IN "referrer") - POSITION('www.' IN "referrer") - 4)) FROM "SOURCE_SQL_STREAM_001";
-
[Save and run SQL] を選択します。[Real-time analytics (リアルタイム分析)] タブに、アプリケーションで作成されたすべてのアプリケーション内ストリームが表示され、データを検証できます。
-