AWS CLI를 사용하여 기본 Kinesis 데이터 스트림 작업 수행 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

AWS CLI를 사용하여 기본 Kinesis 데이터 스트림 작업 수행

이 단원에서는 를 사용하여 명령줄에서 Kinesis Data Streams 기본 방법을 설명합니다.AWS CLI. Amazon Kinesis Data Streams 용어 및 개념에 설명된 개념을 숙지하십시오.

참고

는 Kinesis Data Streams 에 적합하지 않으므로 스트림을 생성한 후에는 사용에 대한 일반 요금이 계정에 부과됩니다.AWS프리 티어에서 제공됩니다. 이 자습서를 마치면 다음을 삭제하십시오.AWS요금 발생을 중지하기 위한 리소스. 자세한 정보는 4단계: 정리을 참조하십시오.

1단계: 스트림 만들기

첫 번째 단계는 스트림을 생성하고 성공적으로 생성되었는지 확인하는 것입니다. 다음 명령을 사용하여 이름이 "Foo"인 스트림을 생성합니다.

aws kinesis create-stream --stream-name Foo

그런 다음, 다음 명령을 사용하여 스트림의 생성 진행 상황을 확인합니다.

aws kinesis describe-stream-summary --stream-name Foo

다음 예와 비슷한 출력 결과를 얻어야 합니다.

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "CREATING", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

이 예에서 스트림은 아직 사용할 준비가 되지 않았다는 CREATING 상태입니다. 잠시 후 다시 확인하면 다음 예와 비슷한 출력 결과가 표시되어야 합니다.

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

이 출력에는 이 자습서에 대해 걱정할 필요가 없는 정보가 있습니다. 가장 중요한 것은 스트림이 사용할 준비가 되었음을 알리는 "StreamStatus": "ACTIVE"와 요청한 단일 샤드에 대한 정보입니다. 다음과 같이 list-streams 명령을 사용하여 새 스트림의 존재 여부를 확인할 수도 있습니다.

aws kinesis list-streams

결과:

{ "StreamNames": [ "Foo" ] }

2단계: 레코드 입력

이제 스트림이 활성 상태이므로 일부 데이터를 입력할 준비가 되었습니다. 이 자습서의 경우 가능한 가장 간단한 명령인 put-record를 사용합니다. 이 명령은 "testdata" 텍스트를 포함하는 단일 데이터 레코드를 스트림에 넣습니다.

aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata

이 명령이 제대로 실행되면 다음 예와 비슷한 출력 결과가 발생합니다.

{ "ShardId": "shardId-000000000000", "SequenceNumber": "49546986683135544286507457936321625675700192471156785154" }

축하합니다. 스트림에 데이터를 추가했습니다. 그런 다음 스트림에서 데이터를 가져오는 방법이 표시됩니다.

3단계: 레코드 가져오기

GetShard반복기

스트림에서 데이터를 가져오기 전에 관심이 있는 샤드에 대한 샤드 반복자를 가져와야 합니다. 샤드 반복자는 소비자(이 경우 get-record 명령)가 읽을 샤드와 스트림의 위치를 나타냅니다. 다음과 같이 get-shard-iterator 명령을 사용합니다.

aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo

를 상기하십시오.aws kinesis명령의 배경에는 Kinesis Data Streams API가 있으므로 표시된 파라미터에 대해 궁금한 경우GetShardIteratorAPI 참조 주제입니다. 성공적으로 실행되면 다음 예와 비슷한 출력 결과가 발생합니다(전체 출력을 확인하려면 수평으로 스크롤).

{ "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=" }

외견상으로 무작위처럼 보이는 문자의 긴 문자열이 샤드 반복자입니다(사용자의 반복자는 다름). 다음에 표시된 대로 샤드 반복자를 복사하여 가져오기 명령에 붙여 넣어야 합니다. 샤드 반복자에는 300초의 유효한 수명 주기가 있습니다. 이 수명 주기는 샤드 반복자를 복사하여 다음 명령에 붙여 넣는데 충분한 시간이어야 합니다. 다음 명령에 붙여 넣기 전에 샤드 반복자에서 모든 줄 바꿈을 제거해야 합니다. 샤드 반복자가 더 이상 유효하지 않다는 오류 메시지가 발생하는 경우 get-shard-iterator 명령을 다시 실행하십시오.

GetRecords

get-records명령은 스트림에서 데이터를 가져오고 호출로 확인됩니다.GetRecordsKinesis Data Streams API에서. 샤드 반복기는 샤드에서 순차적으로 데이터 레코드 읽기를 시작할 위치를 지정합니다. 반복기가 가리키는 샤드 부분에 사용 가능한 레코드가 없는 경우 GetRecords는 빈 목록을 반환합니다. 레코드를 포함하는 샤드 부분을 가져오기 위해 여러 번의 호출이 수행될 수 있습니다.

다음 get-records 명령의 예제에서(전체 명령을 확인하려면 수평으로 스크롤) 다음을 수행합니다.

aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=

bash 등 Unix 유형 명령 프로세서에서 이 자습서를 실행하는 경우 다음과 같이 중첩 명령을 사용하여 샤드 반복기의 획득을 자동화할 수 있습니다(전체 명령을 보려면 수평으로 스크롤).

SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR

이 자습서를 지원하는 시스템에서 실행 중인 경우PowerShell를 사용하여 샤드 반복기의 획득을 자동화할 수 있습니다 (전체 명령을 보려면 수평으로 스크롤).

aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])

get-records 명령을 성공적으로 실행하면 다음 예제와 같이 스트림에서 샤드 반복기를 가져올 때 지정한 샤드에 대한 레코드를 요청합니다(전체 출력을 보려면 수평으로 스크롤).

{ "Records":[ { "Data":"dGVzdGRhdGE=", "PartitionKey":"123”, "ApproximateArrivalTimestamp": 1.441215410867E9, "SequenceNumber":"49544985256907370027570885864065577703022652638596431874" } ], "MillisBehindLatest":24000, "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is=" }

get-records는 위에 요청으로 설명되어 있습니다. 즉, 스트림에 레코드가 있는 경우에도 0개 이상의 레코드를 받을 수 있으며, 반환된 레코드는 현재 스트림의 모든 레코드를 나타내지 않을 수 있습니다. 이러한 상황은 완벽하게 일반적이며, 프로덕션 코드는 적절한 간격으로 레코드에 대한 스트림을 폴링합니다(이 폴링 속도는 특정 애플리케이션 디자인 요구 사항에 따라 다름).

자습서의 이 부분에서 레코드에 대해 알 수 있는 첫 번째 사항은 데이터가 가비지로 보인다는 것입니다. 일반 텍스트 가 아닙니다.testdata를 보냈습니다. 이는 바이너리 데이터를 전송할 수 있도록 put-record가 Base64 방식의 인코딩을 사용하기 때문입니다. 그러나 Kinesis Data Streams 스트림은AWS CLIBase64를 제공하지 않음해독하다왜냐하면 stdout에 인쇄된 원시 바이너리 콘텐츠에 대한 Base64 방식으로 디코딩하면 특정 플랫폼 및 터미널에 대해 불필요한 동작과 잠재적인 보안 문제가 발생할 수 있기 때문입니다. Base64 방식의 디코더(예: https://www.base64decode.org/)를 사용하여 수동으로 dGVzdGRhdGE=를 디코딩하면 실제로 이 데이터가 testdata임을 알 수 있습니다. 실제로 AWS CLI는 데이터를 소비하는 데 거의 사용되지 않으며, 이전에(describe-streamlist-streams)에 살펴본 것처럼 스트림의 상태를 모니터링하고 정보를 얻는 데 더 많이 사용되므로 이 자습서에서는 이 데이터로 충분합니다. 향후 자습서에서는 Base64 방식이 자동으로 처리되는 KCL (KCL) 을 사용하여 프로덕션 품질의 소비자 애플리케이션을 빌드하는 방법을 보여줍니다. KCL에 대한 자세한 내용은 단원을 참조하십시오.KCL을 사용하여 공유 처리량으로 사용자 지정 소비자 개발.

경우에 따라 get-records가 지정된 스트림/샤드에서 모든 레코드를 반환하지 않는 경우도 있습니다. 이러한 경우, 마지막 결과에서 NextShardIterator를 사용하여 다음 레코드 세트를 가져옵니다. 따라서 더 많은 데이터를 스트림에 넣은 경우(프로덕션 애플리케이션의 일반적인 상황) 매번 get-records를 사용하여 데이터에 대한 폴링을 유지할 수 있습니다. 그러나 300초 샤드 반복기 수명 주기 이내에 다음 샤드 반복기를 사용하여 get-records를 호출하지 않으면 오류 메시지가 발생하며, get-shard-iterator 명령을 사용하여 새로운 샤드 반복기를 가져와야 합니다.

이 출력에는 MillisBehindLatest도 제공됩니다. 이 값은 스트림의 끝에서 GetRecords 작업의 응답이 나오는 시간(밀리초)이며, 소비자가 있는 현재 시간에서 경과된 시간을 나타냅니다. 값이 0이면 레코드 처리를 따라잡았으며 이 시점에서 처리할 새 레코드가 없음을 나타냅니다. 이 자습서의 경우 시간을 들여 꾸준히 읽을 경우 상당히 큰 숫자가 표시될 수 있습니다. 기본적으로 데이터 레코드가 검색을 위해 대기하도록 24시간 동안 스트림에 유지되므로 문제가 되지 않습니다. 이 시간을 보존 기간이라고 하며, 최대 365일로 구성할 수 있습니다.

get-records를 성공적으로 실행하면 현재 스트림에 더 이상 레코드가 없는 경우에도 항상 NextShardIterator가 발생합니다. 이는 생산자가 특정 시점에서 스트림에 더 많을 레코드를 잠재적으로 넣을 수 있다고 가정하는 폴링 모델입니다. 자체의 폴링 루틴을 작성할 수 있는 경우에도 소비자 애플리케이션을 개발하기 위해 이전에 언급한 KCL을 사용할 경우 이 폴링이 무리없이 수행됩니다.

가져올 스트림과 샤드에 더 이상 레코드가 없을 때까지 get-records를 호출할 경우 다음 예와 비슷하게 빈 레코드가 포함된 출력이 표시됩니다(전체 출력을 보려면 수평으로 스크롤).

{ "Records": [], "NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk=" }

4단계: 정리

마지막으로 이전에 명시된 대로 스트림을 삭제하여 리소스를 비우고 계정에 의도하지 않은 요금 청구를 방지합니다. 스트림을 사용하여 데이터를 넣고 가져오든 사용하지 않든 관계없이 스트림당 요금이 누적되므로 스트림을 생성하고 사용하지 않는 경우 언제든지 이를 실제로 수행합니다. 정리 명령은 다음과 같이 간단합니다.

aws kinesis delete-stream --stream-name Foo

성공하면 출력이 발생하지 않으므로 describe-stream을 사용하여 삭제 진행 상황을 확인할 수 있습니다.

aws kinesis describe-stream-summary --stream-name Foo

삭제 명령 직후 이 명령을 실행하면 다음 예와 비슷한 출력 부분이 표시될 수 있습니다.

{ "StreamDescriptionSummary": { "StreamName": "samplestream", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream", "StreamStatus": "ACTIVE",

스트림이 완전히 삭제된 후 describe-stream을 실행하면 "찾을 수 없음" 오류가 발생합니다.

A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation: Stream Foo under account 123456789012 not found.