教程:使用 Kinesis Data Streams 执行基本操作 AWS CLI - Amazon Kinesis Data Streams

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

教程:使用 Kinesis Data Streams 执行基本操作 AWS CLI

本节介绍如何通过 AWS CLI从命令行对 Kinesis 数据流执行基本操作。确保您熟悉Amazon Kinesis Data Streams 术语和概念中讨论的概念。

注意

创建直播后,您的账户会因使用 Kinesis Data Streams 而产生象征性的费用,因为 Kinesis Data Streams 不符合 AWS 免费套餐的资格。完成本教程后,请删除您的 AWS 资源以停止产生费用。有关更多信息,请参阅 步骤 4:清除

步骤 1:创建直播

您的第一步是创建一个流并验证它是否已创建成功。使用以下命令创建一个名为“Foo”的流:

aws kinesis create-stream --stream-name Foo

接下来,发出以下命令以检查流的创建进度:

aws kinesis describe-stream-summary --stream-name Foo

您应获得类似于以下示例的输出:

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "CREATING", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

在此示例中,直播具有状态CREATING,这意味着它尚未准备好使用。在几分钟后再次检查,您应看到类似于以下示例的输出:

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

此输出中有本教程不需要的信息。现在的重要信息是"StreamStatus": "ACTIVE",它告诉你流已准备就绪,可以使用,以及你请求的单个分片的信息。您还可以通过使用 list-streams 命令验证您的新流是否存在,如下所示:

aws kinesis list-streams

输出:

{ "StreamNames": [ "Foo" ] }

第 2 步:记录下来

既然您已经拥有活动的流,您便已做好放置一些数据的准备。在本教程中,您将使用最简单的命令 put-record,该命令会将一个包含文本“testdata”的数据记录放入流中:

aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata

如果成功,此命令将生成类似于以下示例的输出:

{ "ShardId": "shardId-000000000000", "SequenceNumber": "49546986683135544286507457936321625675700192471156785154" }

恭喜,您刚刚已将数据添加到流!接下来您将了解如何从流中获取数据。

第 3 步:获取记录

GetShardIterator

必须先获取感兴趣的分片的分片迭代器,然后才能从流中获取数据。分片迭代器表示消费端(在本例中为 get-record 命令)要从中读取数据的流和分片的位置。您将按如下方式使用该get-shard-iterator命令:

aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo

回想一下,这些aws kinesis命令背后有一个 Kinesis Data API Streams,因此,如果您对显示的任何参数感到好奇,可以在参考主题中GetShardIteratorAPI阅读有关这些参数的信息。成功执行将产生类似于以下示例的输出:

{ "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=" }

看起来像随机字符的长字符串就是分片迭代器(您的字符串将与此不同)。您必须将分片迭代器复制/粘贴到 get 命令中,如下所示。分片迭代器的有效生命周期为 300 秒,应该足以让您将分片迭代器复制/粘贴到下一个命令中。在粘贴到下一个命令之前,必须从分片迭代器中删除所有换行符。如果您收到一条错误消息,提示分片迭代器不再有效,请再次运行该get-shard-iterator命令。

GetRecords

get-records命令从流中获取数据,然后在 Kinesis Data Streams GetRecords中解析为对的调用。API分片迭代器指定了分片中的一个位置,您希望从该位置开始按顺序读取数据记录。如果迭代器指向的分片中的部分没有可用的记录,GetRecords 将返回空白列表。可能需要多次调用才能访问分片中包含记录的部分。

在以下get-records命令示例中:

aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=

如果您从 Unix 类型的命令处理器(例如 bash)运行本教程,则可以使用嵌套命令自动获取分片迭代器,如下所示:

SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR

如果您在支持的系统上运行本教程 PowerShell,则可以使用如下命令自动获取分片迭代器:

aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])

get-records命令的成功结果将从您的流中请求您在获取分片迭代器时指定的分片的记录,如以下示例所示:

{ "Records":[ { "Data":"dGVzdGRhdGE=", "PartitionKey":"123”, "ApproximateArrivalTimestamp": 1.441215410867E9, "SequenceNumber":"49544985256907370027570885864065577703022652638596431874" } ], "MillisBehindLatest":24000, "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is=" }

请注意,上面将其描述get-records请求,这意味着即使您的直播中有记录,您也可能收到零条或多条记录。返回的任何记录可能并不代表当前直播中的所有记录。这是正常现象,生产代码将以适当的时间间隔轮询流中的记录。此轮询速度将根据您的特定应用程序设计要求而有所不同。

在本教程这一部分的记录中,你会注意到数据似乎是垃圾,而且不是testdata我们发送的明文。这归因于 put-record 使用 Base64 编码支持您发送二进制数据的方式。但是,中的 AWS CLI Kinesis Data Streams支持不提供 Base64 解码,因为Base64解码到打印到标准输出的原始二进制内容可能会导致某些平台和终端出现不良行为和潜在的安全问题。如果您使用 Base64 解码程序(例如,https://www.base64decode.org/)对 dGVzdGRhdGE= 进行手动解码,您将看到它实际上是 testdata。就本教程而言,这已经足够了,因为在实践中, AWS CLI 很少使用来消耗数据。更常见的是,它用于监视流的状态并获取信息,如前所示(describe-streamlist-streams)。有关更多信息KCL,请参阅使用共享吞吐量开发自定义使用者KCL

get-records并不总是返回指定流/分片中的所有记录。当出现这种情况时,请使用最后一个结果中的 NextShardIterator 获取下一组记录。如果有更多数据被放入流中(这是生产应用程序中的正常情况),则get-records每次都可以继续轮询数据。但是,如果您在 300 秒的分片迭代器生命周期内没有get-records使用下一个分片迭代器进行调用,则会收到一条错误消息,并且必须使用该get-shard-iterator命令来获取新的分片迭代器。

此输出中还提供了MillisBehindLatest,即GetRecords操作的响应来自流尖端的毫秒数,表示消费者比当前时间落后了多远。零值指示正进行记录处理,此时没有新的记录要处理。在本教程中,如果您一边阅读教程一边操作,则可能会看到这个数值非常大。默认情况下,数据记录在流中保留 24 小时,等待您检索。此时间范围称为保留期,可以配置为最多 365 天。

NextShardIterator即使当前流中没有其他记录,成功的get-records结果也将始终是如此。这是一个假定创建器在任何给定时间内正在将更多记录放入流中的轮询模型。尽管您可以编写自己的轮询例程,但如果您使用前面提到KCL的程序来开发消费者应用程序,则此轮询将为您处理。

如果您调用get-records直到从中提取的流和分片中没有其他记录,则会看到与以下示例类似的空记录的输出:

{ "Records": [], "NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk=" }

步骤 4:清除

删除直播以腾出资源并避免向您的账户收取意外费用。每当你创建了直播并且不会使用它时,都要这样做,因为无论你是否使用直播放入和获取数据,每个流都会产生费用。清理命令如下所示:

aws kinesis delete-stream --stream-name Foo

成功不会产生任何输出。describe-stream用于检查删除进度:

aws kinesis describe-stream-summary --stream-name Foo

如果在 delete 命令之后立即执行此命令,则会看到类似于以下示例的输出:

{ "StreamDescriptionSummary": { "StreamName": "samplestream", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream", "StreamStatus": "ACTIVE",

在流完全删除后,describe-stream 将生成“未找到”错误:

A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation: Stream Foo under account 123456789012 not found.