メニュー
Amazon API Gateway
開発者ガイド

Amazon Kinesis プロキシとして API Gateway API を作成する

このセクションでは、AWS タイプが統合された API Gateway API を作成および設定して、Kinesis にアクセスする方法について説明します。

注記

API Gateway API を Kinesis と統合するには、API Gateway と Kinesis の両方を利用できるリージョンを選択する必要があります。各リージョンの可用性については、「リージョンとエンドポイント」を参照してください。

この図では、サンプル API を作成して、クライアントが次の操作を行うことができるようにします。

  1. ユーザーが Kinesis で使用できるストリームを表示する

  2. 指定されたストリームを作成、説明、または削除する

  3. 指定されたストリームからデータレコードを読み取る、または書き込む

前述のタスクを完了するため、API はさまざまなリソースでメソッドを公開し、それぞれ次のものを呼び出します。

  1. Kinesis の ListStreams アクション

  2. CreateStreamDescribeStream、または DeleteStream アクション

  3. Kinesis の GetRecords または PutRecords (PutRecord を含む) アクション

具体的には、次のように API を作成します。

  • API の /streams リソースで HTTP GET メソッドを公開し、メソッドを Kinesis の ListStreams アクションと統合して、呼び出し元のアカウントでストリームをリストします。

  • API の /streams/{stream-name} リソースで HTTP POST メソッドを公開し、このメソッドを Kinesis の CreateStream アクションと統合して、呼び出し元のアカウントで名前付きストリームを作成します。

  • API の /streams/{stream-name} リソースで HTTP GET メソッドを公開し、このメソッドを Kinesis の DescribeStream アクションと統合して、呼び出し元のアカウントで名前付きストリームを記述します。

  • API の /streams/{stream-name} リソースで HTTP DELETE メソッドを公開し、このメソッドを Kinesis の DeleteStream アクションと統合して、呼び出し元のアカウントでストリームを削除します。

  • API の /streams/{stream-name}/record リソースで HTTP PUT メソッドを公開し、このメソッドを Kinesis の PutRecord アクションと統合します。これにより、クライアントは名前付きストリームに 1 つのデータレコードを追加できます。

  • API の /streams/{stream-name}/records リソースで HTTP PUT メソッドを公開し、このメソッドを Kinesis の PutRecords アクションと統合します。これにより、クライアントは名前付きストリームにデータレコードのリストを追加できます。

  • API の /streams/{stream-name}/records リソースで HTTP GET メソッドを公開し、このメソッドを Kinesis の GetRecords アクションと統合します。これにより、クライアントは名前付きストリームで指定されたシャードイテレーターとともにデータレコードを一覧表示できます。シャードイテレーターは、データレコードの逐次読み取りを開始する、シャードの位置を指定します。

  • API の /streams/{stream-name}/sharditerator リソースで HTTP GET メソッドを公開し、このメソッドを Kinesis の GetShardIterator アクションと統合します。このヘルパーメソッドは、Kinesis の ListStreams アクションに指定する必要があります。

ここに示す手順を他の Kinesis アクションに適用できます。Kinesis アクションの完全なリストについては、Amazon Kinesis API リファレンス 参照してください。

API Gateway コンソールを使用してサンプル API を作成する代わりに、API Gateway の API のインポート機能または API Gateway Swagger Importer を使用してサンプル API を API Gateway にインポートできます。API のインポート機能の使用方法の詳細については、「API を API Gateway にインポートする」を参照してください。API Gateway Swagger Importer の使用方法の詳細については、「API Gateway Swagger Importer の開始方法」を参照してください。

AWS アカウントをお持ちでない場合は、次に説明する手順に従ってアカウントを作成してください。

AWS にサインアップするには

  1. https://aws.amazon.com/ を開き、[AWS アカウントの作成] を選択します。

  2. オンラインの手順に従います。

API が Kinesis にアクセスするための IAM ロールとポリシーを作成する

API が Kinesis アクションを呼び出せるようにするには、IAM ロールにアタッチされた適切な IAM ポリシーが必要です。次のセクションでは、必要に応じて IAM のロールとポリシーを確認および作成する方法について説明します。

Kinesis への読み取り専用アクセスを有効にするには、Kinesis で呼び出す Get*List*、および Describe* アクションを許可する AmazonKinesisReadOnlyAccess ポリシーを使用できます。

Copy
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kinesis:Get*", "kinesis:List*", "kinesis:Describe*" ], "Resource": "*" } ] }

このポリシーは、IAM からプロビジョニングされた管理ポリシーとして使用でき、その ARN は arn:aws:iam::aws:policy/AmazonKinesisReadOnlyAccess です。

Kinesis で読み取りおよび書き込みアクションを有効にするには、AmazonKinesisFullAccess ポリシーを使用できます。

Copy
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "kinesis:*", "Resource": "*" } ] }

このポリシーも、IAM でプロビジョニングされた管理ポリシーとして使用できます。その ARN は arn:aws:iam::aws:policy/AmazonKinesisFullAccess です。

使用する IAM ポリシーを決定したら、新しい、または既存の IAM ロールにアタッチします。API Gateway 管理サービス (apigateway.amazonaws.com) がロールの信頼されたエンティティで、実行ロール (sts:AssumeRole) の引き受けが許可されていることを確認します。

Copy
{ "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Principal": { "Service": "apigateway.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

IAM コンソールで実行ロールを作成し、Amazon API Gateway ロールタイプを選択すると、この信頼ポリシーが自動的にアタッチされます。

実行ロールの ARN をメモしておきます。これは API メソッドを作成し、その統合リクエストを設定するときに必要になります。

Kinesis プロキシとして API の作成を開始する

次のステップを使用して、API Gateway コンソールで API を作成します。

Kinesis 用の AWS サービスプロキシとして API を作成するには

  1. API Gateway コンソールで、[Create API] を選択します。

  2. [New API] を選択します。

  3. [API name] に「KinesisProxy」と入力します。その他のフィールドはデフォルト値のままにしておきます。

  4. 必要に応じて、[Description] に説明を入力します。

  5. [Create API] を選択します。

API が作成されると、API Gateway コンソールには、API のルート (/) リソースのみを含む [Resources] ページが表示されます。

Kinesis でストリームをリスト表示する

Kinesis では、次の REST API 呼び出しで実行される ListStreams アクションをサポートしています。

Copy
POST /?Action=ListStreams HTTP/1.1 Host: kinesis.<region>.<domain> Content-Length: <PayloadSizeBytes> User-Agent: <UserAgentString> Content-Type: application/x-amz-json-1.1 Authorization: <AuthParams> X-Amz-Date: <Date> { ... }

上記の REST API リクエストでは、このアクションは、Action クエリパラメータで指定されます。または、代わりに、X-Amz-Target ヘッダーでこのアクションを指定することもできます。

Copy
POST / HTTP/1.1 Host: kinesis.<region>.<domain> Content-Length: <PayloadSizeBytes> User-Agent: <UserAgentString> Content-Type: application/x-amz-json-1.1 Authorization: <AuthParams> X-Amz-Date: <Date> X-Amz-Target: Kinesis_20131202.ListStreams { ... }

このチュートリアルでは、クエリパラメータを使用してアクションを指定します。

API の Kinesis アクションを公開するには、/streams リソースを API のルートに追加します。次に、GET メソッドをリソースに設定し、そのメソッドを Kinesis のListStreams アクションと統合します。

次の手順は、API Gateway コンソールを使用して Kinesis ストリームをリスト表示する方法を示しています。

API Gateway コンソールを使用して Kinesis ストリームをリスト表示するには

  1. API のルートリソースを選択します。[Actions] で、[Create Resource] を選択します。

    [Resource Name] に「Streams」と入力し、[Resource Path] および他のフィールドをデフォルトのままにして、[Create Resource] を選択します。

  2. /Streams リソースを選択します。[Actions] から [Create Method] を選択し、一覧から [GET] を選択します。次に、チェックマークアイコンを選択してメソッドの作成を終了します。

    注記

    クライアントによって呼び出されるメソッドの HTTP 動詞は、バックエンド統合が必要な場合の HTTP 動詞とは異なる場合があります。直感的に、ストリームのリスト表示は読み取りオペレーションであるため、ここでは GET を選択します。

  3. メソッドの [Setup] ペインで、[AWS Service ] を選択します。

    1. [AWS Region] で、リージョン (例: us-east-1) を選択します。

    2. [AWS Service] で、[Kinesis] を選択します。

    3. [AWS Subdomain] は空白のままにします。

    4. [HTTP method] で、[POST] を選択します。

      注記

      ここで POST を選択したのは、Kinesis では ListStreams アクションはこれを使用して実行する必要があるためです。

    5. [Action Type] で、[Use action name] を選択します。

    6. [Action] に、「ListStreams」と入力します。

    7. [Execution role] に、実行ロールの ARN を入力します。

    8. [Content Handling] の [Passthrough] はデフォルトのままにします。

    9. [Save] を選択して、メソッドの初期設定を終了します。

     Kinesis ListStreams アクションの Streams:GET メソッドを設定します。
  4. 引き続き [Integration Request] ペインで、[HTTP Headers] セクションを展開します。

    1. [Add header] を選択します。

    2. [Name] ボックスに、「Content-Type」と入力します。

    3. [Mapped from] 列に「'application/x-amz-json-1.1'」と入力します。

    4. チェックマークアイコンを選択して、設定を保存します。

    入力が特定のバージョンの JSON であることを Kinesis に知らせるために、Content-Type ヘッダーを 'application/x-amz-json-1.1' の静的な値に設定するリクエストパラメータマッピングを使用しました。

  5. [Body Mapping Templates] セクションを展開します。

    1. [Add mapping template] を選択します。

    2. [Content-Type] に「application/json」と入力します。

    3. チェックマークアイコンを選択して [Content-Type] の設定を保存します。[Change passthrough behavior] の [Yes, secure this integration] を選択します。

    4. Template Editor に「{}」と入力します。

    5. [Save] ボタンを選択して、マッピングテンプレートを保存します。

    [ListStreams] リクエストは、次の JSON 形式のペイロードを受け取ります。

    Copy
    { "ExclusiveStartStreamName": "string", "Limit": number }

    ただし、プロパティはオプションです。デフォルト値を使用するため、ここでは空の JSON ペイロードを選択しました。

     Kinesis の ListStreams アクション用に Streams:GET メソッドのデータマッピングを設定する
  6. ストリームリソースで GET メソッドをテストし、Kinesis で ListStreams アクションを呼び出します。

    API Gateway コンソールで、[Resources] ペインから /streams/GET エントリを選択し、[Test] 呼び出しオプションを選択して、[Test] を選択します。

    すでに Kinesis で "myStream" および "yourStream" という 2 つのストリームを作成している場合、テストに成功すると、次のペイロードを含む 200 OK レスポンスが返されます。

    Copy
    { "HasMoreStreams": false, "StreamNames": [ "myStream", "yourStream" ] }

Kinesis でストリームを作成、表示、削除する

Kinesis でのストリームの作成、表示、および削除には、それぞれ次の Kinesis REST API リクエストの作成が関連します。

Copy
POST /?Action=CreateStream HTTP/1.1 Host: kinesis.region.domain ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "ShardCount": number, "StreamName": "string" }
Copy
POST /?Action=DescribeStream HTTP/1.1 Host: kinesis.region.domain ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "ExclusiveStartShardId": "string", "Limit": number, "StreamName": "string" }
Copy
POST /?Action=DeleteStream HTTP/1.1 Host: kinesis.region.domain ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "StreamName":"string" }

API を作成して、必要な入力をメソッドリクエストの JSON ペイロードとして受け取り、ペイロードを統合リクエストに渡すことができます。ただし、メソッドリクエストと統合リクエスト、およびメソッドレスポンスと統合レスポンスの間のデータマッピングの例を詳細に示すため、API は少し異なる方法で作成します。

これから名前を付ける Stream リソースの HTTP メソッド (GETPOSTDelete) を公開します。{stream-name} パス変数をストリームリソースのプレースホルダーとして使用して、これから名前を付けるリソースを保持し、これらの API メソッドを、Kinesis の DescribeStreamCreateStream、および DeleteStream アクションとそれぞれ統合します。クライアントが他の入力データをメソッドリクエストのヘッダー、クエリパラメータ、またはペイロードとして渡す必要があり、データを必要な統合リクエストペイロードに変換するためのマッピングテンプレートを提供します。

ストリームリソースで GET メソッドを設定し、テストするには

  1. 以前に作成した /streams リソースの下の {stream-name} パス変数を使用して、子リソースを作成します。

    {stream-name} リソースを作成する
  2. HTTP 動詞 (POSTGETDELETE) をこのリソースに追加します。

    リソースでメソッドを作成すると、API の構造は次のようになります。

     ストリームリソースで POST、GET、および DELETE メソッドを作成する
  3. GET /streams/{stream-name} メソッドを設定するには、以下に示すように、Kinesis の POST /?Action=DescribeStream アクションを呼び出します。

     GET-on-Stream メソッドを設定して Kinesis で DescribeStream アクションを呼び出す
  4. 次のヘッダーマッピング (Content-Type) を統合リクエストに追加します。

    Copy
    Content-Type: 'x-amz-json-1.1'

    このタスクでは、同じ手順を使用して GET /streams メソッドのリクエストパラメータマッピングを設定します。

  5. 次の本文マッピングテンプレートを追加して、GET /streams/{stream-name} メソッドリクエストから POST /?Action=DescribeStream 統合リクエストにデータをマッピングします。

    Copy
    { "StreamName": "$input.params('stream-name')" }

    このマッピングテンプレートでは、メソッドリクエストの stream-name パスのパラメータ値から Kinesis の DescribeStream アクションに必要な統合リクエストのペイロードを生成します。

  6. GET /stream/{stream-name} メソッドをテストして Kinesis で DescribeStream アクションを呼び出す

    API Gateway コンソールから、[Resources] ペインで /streams/{stream-name}/GET を選択し、[Test] を選択してテストを開始します。次に、stream-name の [Path] フィールドに既存の Kinesis ストリームの名前を入力し、[Test] を選択します。テストに成功すると、200 OK レスポンスが、次のようなペイロードとともに返されます。

    Copy
    { "StreamDescription": { "HasMoreShards": false, "RetentionPeriodHours": 24, "Shards": [ { "HashKeyRange": { "EndingHashKey": "68056473384187692692674921486353642290", "StartingHashKey": "0" }, "SequenceNumberRange": { "StartingSequenceNumber": "49559266461454070523309915164834022007924120923395850242" }, "ShardId": "shardId-000000000000" }, ... { "HashKeyRange": { "EndingHashKey": "340282366920938463463374607431768211455", "StartingHashKey": "272225893536750770770699685945414569164" }, "SequenceNumberRange": { "StartingSequenceNumber": "49559266461543273504104037657400164881014714369419771970" }, "ShardId": "shardId-000000000004" } ], "StreamARN": "arn:aws:kinesis:us-east-1:12345678901:stream/myStream", "StreamName": "myStream", "StreamStatus": "ACTIVE" } }

    API をデプロイした後は、この API メソッドに対して REST リクエストを行うことができます。

    Copy
    GET https://your-api-id.execute-api.region.amazonaws.com/stage/streams/myStream HTTP/1.1 Host: your-api-id.execute-api.region.amazonaws.com Content-Type: application/json Authorization: ... X-Amz-Date: 20160323T194451Z

ストリームリソースで POST メソッドを設定し、テストするには

  1. Kinesis の POST /?Action=CreateStream アクションを呼び出すよう POST /streams/{stream-name} メソッドを設定します。このタスクでは、DescribeStream アクションを CreateStream に置き換えていることを前提として、同じ手順を使用して GET /streams/{stream-name} メソッドを設定します。

  2. 次のヘッダーマッピング (Content-Type) を統合リクエストに追加します。

    Copy
    Content-Type: 'x-amz-json-1.1'

    このタスクでは、同じ手順を使用して GET /streams メソッドのリクエストパラメータマッピングを設定します。

  3. 次の本文マッピングテンプレートを追加して、POST /streams/{stream-name} メソッドリクエストから POST /?Action=CreateStream 統合リクエストにデータをマッピングします。

    Copy
    { "ShardCount": #if($input.path('$.ShardCount') == '') 5 #else $input.path('$.ShardCount') #end, "StreamName": "$input.params('stream-name')" }

    前述のマッピングテンプレートで、クライアントがメソッドリクエストペイロードで値を指定しなかった場合は、ShardCount を固定値 5 に設定します。

  4. POST /streams/{stream-name} メソッドをテストして Kinesis で名前付きストリームを作成する

    API Gateway コンソールから、[Resources] ペインで /streams/{stream-name}/POST を選択し、[Test] を選択してテストを開始します。次に、stream-name の [Path] に既存の Kinesis ストリームの名前を入力し、[Test] を選択します。テストが完了すると、データなしで 200 OK レスポンスが返されます。

    API をデプロイした後で、Stream リソースで POST メソッドに対して REST API リクエストを行い、Kinesis で CreateStream アクションを呼び出すこともできます。

    Copy
    POST https://your-api-id.execute-api.region.amazonaws.com/stage/streams/yourStream HTTP/1.1 Host: your-api-id.execute-api.region.amazonaws.com Content-Type: application/json Authorization: ... X-Amz-Date: 20160323T194451Z { "ShardCount": 5 }

ストリームリソースで DELETE メソッドを設定し、テストする

  1. Kinesis の POST /?Action=DeleteStream アクションと統合できるように DELETE /streams/{stream-name} メソッドを設定します。このタスクでは、DescribeStream アクションを DeleteStream に置き換えていることを前提として、同じ手順を使用して GET /streams/{stream-name} メソッドを設定します。

  2. 次のヘッダーマッピング (Content-Type) を統合リクエストに追加します。

    Copy
    Content-Type: 'x-amz-json-1.1'

    このタスクでは、同じ手順を使用して GET /streams メソッドのリクエストパラメータマッピングを設定します。

  3. 次の本文マッピングテンプレートを追加して、DELETE /streams/{stream-name} メソッドリクエストから POST /?Action=DeleteStream の該当する統合リクエストにデータをマッピングします。

    Copy
    { "StreamName": "$input.params('stream-name')" }

    このマッピングテンプレートでは、stream-name のクライアントが指定した URL パス名値から DELETE /streams/{stream-name} アクションに必要な入力が生成されます。

  4. DELETE メソッドをテストして、Kinesis で名前付きストリームを削除します

    API Gateway コンソールから、[Resources] ペインで /streams/{stream-name}/DELETE メソッドノードを選択し、[Test] を選択してテストを開始します。次に、stream-name の [Path] に既存の Kinesis ストリームの名前を入力し、[Test] を選択します。テストが完了すると、データなしで 200 OK レスポンスが返されます。

    API をデプロイした後で、Stream リソースで DELETE メソッドに対して次の REST API リクエストを行って、Kinesis で DeleteStream アクションを呼び出すことができます。

    Copy
    DELETE https://your-api-id.execute-api.region.amazonaws.com/stage/streams/yourStream HTTP/1.1 Host: your-api-id.execute-api.region.amazonaws.com Content-Type: application/json Authorization: ... X-Amz-Date: 20160323T194451Z {}

Kinesis のストリームからレコードを取得、追加する

Kinesis でストリームを作成すると、ストリームにデータレコードを追加したり、ストリームからデータを読み取ったりできます。データレコードの追加には、Kinesis での PutRecords または PutRecord アクションの呼び出しが関連します。前者は複数のレコードを追加し、後者は 1 つのレコードをストリームに追加します。

Copy
POST /?Action=PutRecords HTTP/1.1 Host: kinesis.region.domain Authorization: AWS4-HMAC-SHA256 Credential=..., ... ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "Records": [ { "Data": blob, "ExplicitHashKey": "string", "PartitionKey": "string" } ], "StreamName": "string" }

または

Copy
POST /?Action=PutRecord HTTP/1.1 Host: kinesis.region.domain Authorization: AWS4-HMAC-SHA256 Credential=..., ... ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "Data": blob, "ExplicitHashKey": "string", "PartitionKey": "string", "SequenceNumberForOrdering": "string", "StreamName": "string" }

ここで、StreamName はレコードを追加するターゲットストリームを識別します。StreamNameData、および PartitionKey は必須の入力データです。この例では、すべてのオプション入力データにデフォルト値を使用し、メソッドリクエストへの入力ではそれらの値を明示的に指定しません。

Kinesis でのデータの読み取りは、GetRecords アクションの呼び出しと同じです。

Copy
POST /?Action=GetRecords HTTP/1.1 Host: kinesis.region.domain Authorization: AWS4-HMAC-SHA256 Credential=..., ... ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "ShardIterator": "string", "Limit": number }

ここで、レコードの取得元のソースストリームは、シャードイテレーターを取得するために次の Kinesis アクションに示すように、必須の ShardIterator 値で指定されます。

Copy
POST /?Action=GetShardIterator HTTP/1.1 Host: kinesis.region.domain Authorization: AWS4-HMAC-SHA256 Credential=..., ... ... Content-Type: application/x-amz-json-1.1 Content-Length: PayloadSizeBytes { "ShardId": "string", "ShardIteratorType": "string", "StartingSequenceNumber": "string", "StreamName": "string" }

GetRecords および PutRecords アクションでは、それぞれ GET および PUT メソッドを、名前付きストリームリソース (/{stream-name}) に追加される /records リソースで公開します。同様に、PutRecord アクションは /record リソースで PUT メソッドとして公開します。

GetRecords アクションは、GetShardIterator ヘルパーアクションを呼び出して取得される ShardIterator 値を入力として受け取るため、ShardIterator リソース (/sharditerator) で GET ヘルパーメソッドを公開します。

次の図は、メソッド作成後のリソースの API 構造を示しています。

 レコードの作成: API の GET|PUT|PUT|GET メソッド

次の 4 つの手順では、各メソッドの設定方法、メソッドリクエストから統合リクエストにデータをマッピングする方法、およびメソッドをテストする方法について説明します。

PUT /streams/{stream-name}/record メソッドをセットアップしてテストし、Kinesis の PutRecord を呼び出すには

  1. 次に示すように PUT メソッドを設定します。

     Kinesis で PutRecord アクションを呼び出よう PUT メソッドを設定します。
  2. 次のリクエストパラメータマッピングを追加して、Content-Type ヘッダーを、統合リクエストの AWS 準拠の JSON に設定します。

    Copy
    Content-Type: 'x-amz-json-1.1'

    このタスクでは、同じ手順を使用して GET /streams メソッドのリクエストパラメータマッピングを設定します。

  3. 次の本文マッピングテンプレートを追加して、PUT /streams/{stream-name}/record メソッドリクエストから POST /?Action=PutRecord の該当する統合リクエストにデータをマッピングします。

    Copy
    { "StreamName": "$input.params('stream-name')", "Data": "$util.base64Encode($input.json('$.Data'))", "PartitionKey": "$input.path('$.PartitionKey')" }

    このマッピングテンプレートでは、メソッドリクエストのペイロードが次の形式であることを想定しています。

    Copy
    { "Data": "some data", "PartitionKey": "some key" }

    このデータは、次の JSON スキーマでモデル化することができます。

    Copy
    { "$schema": "http://json-schema.org/draft-04/schema#", "title": "PutRecord proxy single-record payload", "type": "object", "properties": { "Data": { "type": "string" }, "PartitionKey": { "type": "string" } } }

    モデルを作成してこのスキーマを含め、このモデルを使用してマッピングテンプレートを生成を容易にすることができます。ただし、モデルを使用せずにマッピングテンプレートを生成することができます。

  4. PUT /streams/{stream-name}/record メソッドをテストするには、stream-name パス変数を既存のストリームの名前に設定し、必要な形式のペイロードを指定して、メソッドリクエストを送信します。成功した場合の結果は 200 OK レスポンスと、次の形式のペイロードとなります。

    Copy
    { "SequenceNumber": "49559409944537880850133345460169886593573102115167928386", "ShardId": "shardId-000000000004" }

PUT /streams/{stream-name}/records メソッドをセットアップしてテストするには、Kinesis で PutRecords を呼び出します。

  1. 次に示すように PUT /streams/{stream-name}/records メソッドを設定します。

     Kinesis の PutRecords アクションの PUT /streams/{stream-name}/records メソッドを設定します
  2. 次のリクエストパラメータマッピングを追加して、Content-Type ヘッダーを、統合リクエストの AWS 準拠の JSON に設定します。

    Copy
    Content-Type: 'x-amz-json-1.1'

    このタスクでは、同じ手順を使用して GET /streams メソッドのリクエストパラメータマッピングを設定します。

  3. 次の本文マッピングテンプレートを追加して、PUT /streams/{stream-name}/records メソッドリクエストから POST /?Action=PutRecords の該当する統合リクエストにデータをマッピングします。

    Copy
    { "StreamName": "$input.params('stream-name')", "Records": [ #foreach($elem in $input.path('$.records')) { "Data": "$util.base64Encode($elem.data)", "PartitionKey": "$elem.partition-key" }#if($foreach.hasNext),#end #end ] }

    このマッピングテンプレートでは、メソッドリクエストのペイロードが次の JSON スキーマでモデル化できることを想定しています。

    Copy
    { "$schema": "http://json-schema.org/draft-04/schema#", "title": "PutRecords proxy payload data", "type": "object", "properties": { "records": { "type": "array", "items": { "type": "object", "properties": { "data": { "type": "string" }, "partition-key": { "type": "string" } } } } } }

    モデルを作成してこのスキーマを含め、このモデルを使用してマッピングテンプレートを生成を容易にすることができます。ただし、モデルを使用せずにマッピングテンプレートを生成することができます。

    このチュートリアルでは、API 開発者がバックエンドデータ形式をクライアントに開示するか、クライアントから見えないようにするかを選択できることを示すため、わずかに異なる 2 つのペイロード形式を使用しました。1 つは PUT /streams/{stream-name}/records メソッド (上記) 用の形式、もう 1 つは PUT /streams/{stream-name}/record メソッド (前の手順) 用の形式です。実稼働環境では、両方の形式の一貫性を保ってください。

  4. PUT /streams/{stream-name}/records メソッドをテストするには、stream-name パス変数を既存のストリームに設定し、前に示したように次のペイロードを指定して、メソッドリクエストを送信します。

    Copy
    { "records": [ { "data": "some data", "partition-key": "some key" }, { "data": "some other data", "partition-key": "some key" } ] }

    成功した場合の結果は 200 OK レスポンスと、以下の出力例のようになります。

    Copy
    { "FailedRecordCount": 0, "Records": [ { "SequenceNumber": "49559409944537880850133345460167468741933742152373764162", "ShardId": "shardId-000000000004" }, { "SequenceNumber": "49559409944537880850133345460168677667753356781548470338", "ShardId": "shardId-000000000004" } ] }

GET /streams/{stream-name}/sharditerator メソッドをセットアップしてテストするには、Kinesis で GetShardIterator を呼び出します。

GET /streams/{stream-name}/sharditerator メソッドは、GET /streams/{stream-name}/records メソッドを呼び出す前に必須のシャードイテレーターを取得するためのヘルパーメソッドです。

  1. 次に示すように GET /streams/{stream-name}/sharditerator メソッドを設定します。

     GET /streams/{stream-name}/sharditerator メソッドを設定します。
  2. GetShardIterator アクションでは、ShardId 値を入力する必要があります。クライアントで指定された ShardId 値を渡すため、次に示すように、shard-id クエリパラメーターをメソッドリクエストに追加します。

     shard-id クエリパラメーターを GET-on-ShardIterator メソッドリクエストに追加します。

    次の本文マッピングテンプレートでは、shard-id クエリパラメータ値を、Kinesis の GetShardIterator アクションの入力として、JSON ペイロードの ShardId プロパティ値に設定します。

  3. 本文マッピングテンプレートを設定して、必要な入力 (ShardId および StreamName) を、メソッドリクエストの shard-id および stream-name パラメータの GetShardIterator アクションに生成します。また、マッピングテンプレートでも、ShardIteratorType は、デフォルトとして TRIM_HORIZON に設定されます。

    Copy
    { "ShardId": "$input.params('shard-id')", "ShardIteratorType": "TRIM_HORIZON", "StreamName": "$input.params('stream-name')" }
  4. API Gateway コンソールの [Test] オプションを使用して、既存のストリーム名を stream-name Path 変数値として入力します。次に、shard-id の [Query string] を既存の ShardId 値 (例: shard-000000000004) に設定し、[Test] を選択します。

    成功のレスポンスペイロードは以下の出力例のようになります。

    Copy
    { "ShardIterator": "AAAAAAAAAAFYVN3VlFy..." }

    ShardIterator の値をメモしておきます。これは、ストリームからレコードを取得するために必要です。

GET /streams/{stream-name}/records メソッドを設定およびテストして、Kinesis の GetRecords アクションを呼び出すには

  1. 次に示すように GET /streams/{stream-name}/records メソッドを設定します。

     GET /streams/{stream-name}/records メソッドを設定します。
  2. GetRecords オペレーションでは、ShardIterator の値の入力が必要です。クライアントで指定された ShardIterator 値を渡すため、次に示すように、Shard-Iterator ヘッダーパラメーターをメソッドリクエストに追加します。

     GET-on-Records メソッドリクエストに Shard-Iterator ヘッダーパラメーターを追加します。
  3. 以下のマッピングテンプレートを設定して、Shard-Iterator ヘッダーパラメータ値を Kinesis の GetRecords アクションの JSON ペイロードの ShardIterator プロパティ値にマッピングします。

    Copy
    { "ShardIterator": "$input.params('Shard-Iterator')" }
  4. API Gateway コンソールの [Test] オプションを使用して、既存のストリーム名を stream-namePath 変数値として入力します。次に、Shard-IteratorHeader を、GET /streams/{stream-name}/sharditerator メソッド (上記) のテスト実行から取得した ShardIterator 値に設定して、[Test] を選択します。

    成功のレスポンスペイロードは以下の出力例のようになります。

    Copy
    { "MillisBehindLatest": 0, "NextShardIterator": "AAAAAAAAAAF...", "Records": [ ... ] }