Amazon Kinesis Data Streams
開発者ガイド

ストリームの作成

次の手順に従って Kinesis data stream を作成します 。

Kinesis Data Streams クライアントの構築

Kinesis data stream を使用する前に、クライアントオブジェクトを構築する必要があります。次の Java コードは、クライアントビルダーをインスタンス化し、それを使用してリージョン、認証情報、およびクライアント設定を指定します。次に、クライアントオブジェクトを構築します。

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis client = clientBuilder.build();

詳細については、AWS General Reference の「Kinesis Data Streams のリージョンとエンドポイント」を参照してください。

ストリームを作成する

Kinesis Data Streams クライアントを作成したら、使用するストリームを作成できます。この作業は、Kinesis Data Streams コンソールまたはプログラムから実行できます。プログラムでストリームを作成するには、CreateStreamRequest オブジェクトをインスタンス化し、ストリームの名前とストリームが使用するシャードの数を指定します。

CreateStreamRequest createStreamRequest = new CreateStreamRequest(); createStreamRequest.setStreamName( myStreamName ); createStreamRequest.setShardCount( myStreamSize );

ストリーム名はストリームを識別するために使用されます。この名前のスコープは、アプリケーションが使用する AWS アカウントに限定されます。また、リージョンにも限定されます。つまり、2 つの異なる AWS アカウント内の 2 つのストリームを同じ名前にすることができ、同じ AWS アカウントで 2 つの異なるリージョン内の 2 つのストリームを同じ名前にすることができますが、同じアカウントで、同じリージョン内の 2 つのストリームを同じ名前にすることはできません。

ストリームのスループットはシャードの数によって決まります。プロビジョンドスループットを高くするほど、必要になるシャードの数は増えます。シャードが増えると、ストリームに対して請求される AWS のコストも増えます。アプリケーションに適切なシャードの数の計算の詳細については、「Kinesis Data Stream の初期サイズを決定する」を参照してください。

createStreamRequest オブジェクトを設定した後、クライアントの createStream メソッドを呼び出すことで、ストリームを作成します。createStream の呼び出し後、ストリームに対してさらにオペレーションを実行するには、ストリームが ACTIVE 状態になるまで待機します。ストリームの状態を確認するには、describeStream メソッドを呼び出します。ただし、ストリームが存在しない場合、describeStream は例外をスローします。そのために、describeStream の呼び出しは try/catch ブロックで囲みます。

client.createStream( createStreamRequest ); DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName( myStreamName ); long startTime = System.currentTimeMillis(); long endTime = startTime + ( 10 * 60 * 1000 ); while ( System.currentTimeMillis() < endTime ) { try { Thread.sleep(20 * 1000); } catch ( Exception e ) {} try { DescribeStreamResult describeStreamResponse = client.describeStream( describeStreamRequest ); String streamStatus = describeStreamResponse.getStreamDescription().getStreamStatus(); if ( streamStatus.equals( "ACTIVE" ) ) { break; } // // sleep for one second // try { Thread.sleep( 1000 ); } catch ( Exception e ) {} } catch ( ResourceNotFoundException e ) {} } if ( System.currentTimeMillis() >= endTime ) { throw new RuntimeException( "Stream " + myStreamName + " never went active" ); }