または AWS SDK PutRecordで使用する CLI - AWS SDK コード例

AWS Doc SDK Examples GitHub リポジトリには他にも AWS SDK例があります。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

または AWS SDK PutRecordで使用する CLI

以下のコード例は、PutRecord の使用方法を示しています。

アクション例は、より大きなプログラムからのコードの抜粋であり、コンテキスト内で実行する必要があります。次のコード例で、このアクションのコンテキストを確認できます。

CLI
AWS CLI

データストリームにデータレコードを書き込むには

次の put-record の例は、指定されたパーティションキーを使用して、指定されたデータストリームに単一のデータレコードを書き込みます。

aws kinesis put-record \ --stream-name samplestream \ --data sampledatarecord \ --partition-key samplepartitionkey

出力:

{ "ShardId": "shardId-000000000009", "SequenceNumber": "49600902273357540915989931256901506243878407835297513618", "EncryptionType": "KMS" }

詳細については、Amazon Kinesis Data Streams デベロッパーガイド」の「 for Java APIでの AWS SDK Amazon Kinesis Data Streams を使用したプロデューサーの開発」を参照してください。 Amazon Kinesis

  • API 詳細については、AWS CLI 「 コマンドリファレンスPutRecord」の「」を参照してください。

Java
SDK for Java 2.x
注記

の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.kinesis.KinesisClient; import software.amazon.awssdk.services.kinesis.model.PutRecordRequest; import software.amazon.awssdk.services.kinesis.model.KinesisException; import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest; import software.amazon.awssdk.services.kinesis.model.DescribeStreamResponse; /** * Before running this Java V2 code example, set up your development * environment, including your credentials. * * For more information, see the following documentation topic: * * https://docs.aws.amazon.com/sdk-for-java/latest/developer-guide/get-started.html */ public class StockTradesWriter { public static void main(String[] args) { final String usage = """ Usage: <streamName> Where: streamName - The Amazon Kinesis data stream to which records are written (for example, StockTradeStream) """; if (args.length != 1) { System.out.println(usage); System.exit(1); } String streamName = args[0]; Region region = Region.US_EAST_1; KinesisClient kinesisClient = KinesisClient.builder() .region(region) .build(); // Ensure that the Kinesis Stream is valid. validateStream(kinesisClient, streamName); setStockData(kinesisClient, streamName); kinesisClient.close(); } public static void setStockData(KinesisClient kinesisClient, String streamName) { try { // Repeatedly send stock trades with a 100 milliseconds wait in between. StockTradeGenerator stockTradeGenerator = new StockTradeGenerator(); // Put in 50 Records for this example. int index = 50; for (int x = 0; x < index; x++) { StockTrade trade = stockTradeGenerator.getRandomTrade(); sendStockTrade(trade, kinesisClient, streamName); Thread.sleep(100); } } catch (KinesisException | InterruptedException e) { System.err.println(e.getMessage()); System.exit(1); } System.out.println("Done"); } private static void sendStockTrade(StockTrade trade, KinesisClient kinesisClient, String streamName) { byte[] bytes = trade.toJsonAsBytes(); // The bytes could be null if there is an issue with the JSON serialization by // the Jackson JSON library. if (bytes == null) { System.out.println("Could not get JSON bytes for stock trade"); return; } System.out.println("Putting trade: " + trade); PutRecordRequest request = PutRecordRequest.builder() .partitionKey(trade.getTickerSymbol()) // We use the ticker symbol as the partition key, explained in // the Supplemental Information section below. .streamName(streamName) .data(SdkBytes.fromByteArray(bytes)) .build(); try { kinesisClient.putRecord(request); } catch (KinesisException e) { System.err.println(e.getMessage()); } } private static void validateStream(KinesisClient kinesisClient, String streamName) { try { DescribeStreamRequest describeStreamRequest = DescribeStreamRequest.builder() .streamName(streamName) .build(); DescribeStreamResponse describeStreamResponse = kinesisClient.describeStream(describeStreamRequest); if (!describeStreamResponse.streamDescription().streamStatus().toString().equals("ACTIVE")) { System.err.println("Stream " + streamName + " is not active. Please wait a few moments and try again."); System.exit(1); } } catch (KinesisException e) { System.err.println("Error found while describing the stream " + streamName); System.err.println(e); System.exit(1); } } }
  • API 詳細については、 リファレンスPutRecordの「」を参照してください。 AWS SDK for Java 2.x API

PowerShell
のツール PowerShell

例 1: -Text パラメータに指定された文字列を含むレコードを書き込みます。

Write-KINRecord -Text "test data from string" -StreamName "mystream" -PartitionKey "Key1"

例 2: 指定されたファイルに含まれるデータを含むレコードを書き込みます。ファイルはバイトのシーケンスとして扱われるため、テキストが含まれている場合は、このコマンドレットで使用する前に必要なエンコードで記述する必要があります。

Write-KINRecord -FilePath "C:\TestData.txt" -StreamName "mystream" -PartitionKey "Key2"
  • API 詳細については、「 コマンドレットリファレンスPutRecord」の「」を参照してください。 AWS Tools for PowerShell

Python
SDK Python 用 (Boto3)
注記

の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

class KinesisStream: """Encapsulates a Kinesis stream.""" def __init__(self, kinesis_client): """ :param kinesis_client: A Boto3 Kinesis client. """ self.kinesis_client = kinesis_client self.name = None self.details = None self.stream_exists_waiter = kinesis_client.get_waiter("stream_exists") def put_record(self, data, partition_key): """ Puts data into the stream. The data is formatted as JSON before it is passed to the stream. :param data: The data to put in the stream. :param partition_key: The partition key to use for the data. :return: Metadata about the record, including its shard ID and sequence number. """ try: response = self.kinesis_client.put_record( StreamName=self.name, Data=json.dumps(data), PartitionKey=partition_key ) logger.info("Put record in stream %s.", self.name) except ClientError: logger.exception("Couldn't put record in stream %s.", self.name) raise else: return response
  • API 詳細については、「 PutRecordPython (Boto3) リファレンス」の「」を参照してください。 AWS SDK API

Rust
SDK Rust の場合
注記

の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

async fn add_record(client: &Client, stream: &str, key: &str, data: &str) -> Result<(), Error> { let blob = Blob::new(data); client .put_record() .data(blob) .partition_key(key) .stream_name(stream) .send() .await?; println!("Put data into stream."); Ok(()) }
  • API 詳細については、AWS SDK「Rust APIリファレンス」のPutRecord「」を参照してください。

SAP ABAP
SDK の SAP ABAP
注記

の詳細については、「」を参照してください GitHub。用例一覧を検索し、AWS コード例リポジトリでの設定と実行の方法を確認してください。

TRY. oo_result = lo_kns->putrecord( " oo_result is returned for testing purposes. " iv_streamname = iv_stream_name iv_data = iv_data iv_partitionkey = iv_partition_key ). MESSAGE 'Record created.' TYPE 'I'. CATCH /aws1/cx_knsinvalidargumentex . MESSAGE 'The specified argument was not valid.' TYPE 'E'. CATCH /aws1/cx_knskmsaccessdeniedex . MESSAGE 'You do not have permission to perform this AWS KMS action.' TYPE 'E'. CATCH /aws1/cx_knskmsdisabledex . MESSAGE 'KMS key used is disabled.' TYPE 'E'. CATCH /aws1/cx_knskmsinvalidstateex . MESSAGE 'KMS key used is in an invalid state. ' TYPE 'E'. CATCH /aws1/cx_knskmsnotfoundex . MESSAGE 'KMS key used is not found.' TYPE 'E'. CATCH /aws1/cx_knskmsoptinrequired . MESSAGE 'KMS key option is required.' TYPE 'E'. CATCH /aws1/cx_knskmsthrottlingex . MESSAGE 'The rate of requests to AWS KMS is exceeding the request quotas.' TYPE 'E'. CATCH /aws1/cx_knsprovthruputexcdex . MESSAGE 'The request rate for the stream is too high, or the requested data is too large for the available throughput.' TYPE 'E'. CATCH /aws1/cx_knsresourcenotfoundex . MESSAGE 'Resource being accessed is not found.' TYPE 'E'. ENDTRY.
  • API 詳細については、PutRecord「」のAWS SDKSAPABAPAPI「」を参照してください。