StreamManagerClient를 사용하여 스트림 작업 - AWS IoT Greengrass

StreamManagerClient를 사용하여 스트림 작업

Greengrass 코어 디바이스에서 실행되는 사용자 정의 Greengrass 구성 요소는 스트림 관리자 SDK의 StreamManagerClient 객체를 사용하여 스트림 관리자에서 스트림을 생성한 다음 스트림과 상호 작용할 수 있습니다. 구성 요소가 스트림을 생성하는 경우 스트림에 대한 AWS 클라우드 대상, 우선 순위, 기타 내보내기 및 데이터 보존 정책을 정의합니다. 스트림 관리자로 데이터를 전송하려면 구성 요소가 데이터를 스트림에 추가합니다. 스트림에 대해 내보내기 대상이 정의된 경우 스트림 관리자는 스트림을 자동으로 내보냅니다.

참고

일반적으로 스트림 관리자의 클라이언트는 사용자 정의 Greengrass 구성 요소입니다. 비즈니스 사례에 필요한 경우 Greengrass 코어에서 실행되는 구성 요소가 아닌 프로세스(예: Docker 컨테이너)가 스트림 관리자와 상호 작용하도록 허용할 수 있습니다. 자세한 내용은 클라이언트 인증 단원을 참조하십시오.

이 주제의 코드 조각은 클라이언트가 스트림 작업을 위해 StreamManagerClient 메서드를 직접적으로 호출하여 사용하는 방법을 보여줍니다. 메서드 및 해당 인수에 대한 구현 세부 정보를 보려면 각 코드 조각 다음에 나열된 SDK 참조에 대한 링크를 사용합니다.

Lambda 함수에서 스트림 관리자를 사용하는 경우 Lambda 함수가 함수 핸들러 StreamManagerClient 외부에서 인스턴스화되어야 합니다. 핸들러에서 인스턴스화된 함수는 호출될 때마다 스트림 관리자에 대한 client 및 연결을 만듭니다.

참고

핸들러에서 StreamManagerClient을(를) 인스턴스한 경우, client이(가) 작업을 완료하면 사용자가 close() 메서드를 명시적으로 호출해야 합니다. 그렇지 않으면 client은(는) 연결을 열어두고 스크립트가 종료될 때까지 다른 스레드를 실행합니다.

StreamManagerClient에서는 다음 작업을 지원합니다.

메시지 스트림 생성

스트림을 생성하기 위해 사용자 정의 Greengrass 구성 요소가 생성 메서드를 직접적으로 호출하고 MessageStreamDefinition 객체를 전달합니다. 이 객체는 스트림의 고유한 이름을 지정하고 최대 스트림 크기에 도달했을 때 스트림 관리자가 새 데이터를 처리하는 방법을 정의합니다. MessageStreamDefinition 및 해당 데이터 유형(예: ExportDefinition, StrategyOnFullPersistence)을 사용하여 다른 스트림 속성을 정의할 수 있습니다. 다음이 포함됩니다.

  • 자동 내보내기를 위한 대상 AWS IoT Analytics, Kinesis Data Streams, AWS IoT SiteWise, 및 Amazon S3 대상. 자세한 내용은 지원되는 AWS 클라우드 대상의 구성 내보내기 단원을 참조하십시오.

  • 내보내기 우선 순위. 스트림 관리자는 우선 순위가 낮은 스트림보다 우선 순위가 높은 스트림을 먼저 내보냅니다.

  • AWS IoT Analytics, Kinesis Data Streams 및 AWS IoT SiteWise 대상에 대한 최대 배치 크기 및 배치 간격 스트림 관리자는 두 조건 중 하나가 충족되면 메시지를 내보냅니다.

  • TTL(Time-to-Live). 스트림 데이터를 처리에 사용할 수 있도록 보장하는 시간입니다. 이 기간 내에 데이터를 사용할 수 있는지 확인해야 합니다. 이는 삭제 정책이 아닙니다. TTL 기간 직후에는 데이터가 삭제되지 않을 수 있습니다.

  • 스트림 지속성. 스트림을 파일 시스템에 저장하여 코어 재시작 시 데이터를 유지하거나 메모리에 스트림을 저장하도록 선택합니다.

  • 시작 시퀀스 번호 내보내기에서 시작 메시지로 사용할 메시지의 시퀀스 번호를 지정합니다.

MessageStreamDefinition에 대한 자세한 내용은 대상 언어에 대한 SDK 참조를 확인하세요.

참고

StreamManagerClient는 스트림을 HTTP 서버로 내보내는 데 사용할 수 있는 대상을 제공합니다. 이 대상은 테스트 목적으로만 사용됩니다. 이 대상은 안정적이지 않으며 프로덕션 환경에서 사용할 수 없습니다.

스트림이 생성되면 Greengrass 구성 요소가 스트림에 메시지를 추가하여 내보내기용 데이터를 전송하고 로컬 처리를 위해 스트림에서 메시지를 읽을 수 있습니다. 생성하는 스트림 수는 하드웨어 기능 및 비즈니스 사례에 따라 다릅니다. 한 가지 전략은 AWS IoT Analytics 또는 Kinesis 데이터 스트림의 각 대상 채널에 대해 스트림을 생성하는 것입니다(하나의 스트림에 대해 여러 개의 대상을 정의할 수 있음). 스트림은 안정적인 수명을 가지고 있습니다.

요구 사항

이 작업에는 다음과 같은 요구 사항이 있습니다.

  • 최소 스트림 관리자 SDK 버전: Python: 1.1.0  |  Java: 1.1.0  |  Node.js: 1.1.0

예시

다음 코드 조각은 StreamName(이)라는 스트림을 생성합니다. 이는 MessageStreamDefinition 및 하위 데이터 유형의 스트림 속성을 정의합니다.

Python
client = StreamManagerClient() try: client.create_message_stream(MessageStreamDefinition( name="StreamName", # Required. max_size=268435456, # Default is 256 MB. stream_segment_size=16777216, # Default is 16 MB. time_to_live_millis=None, # By default, no TTL is enabled. strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required. persistence=Persistence.File, # Default is File. flush_on_write=False, # Default is false. export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS 클라우드. kinesis=None, iot_analytics=None, iot_sitewise=None, s3_task_executor=None ) )) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK 참조: create_message_stream | MessageStreamDefinition

Java
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456L) // Default is 256 MB. .withStreamSegmentSize(16777216L) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS 클라우드. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSitewise(null) .withS3(null) ) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK 참조: createMessageStream | MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456) // Default is 256 MB. .withStreamSegmentSize(16777216) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS 클라우드. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSiteWise(null) .withS3(null) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 참조: createMessageStream | MessageStreamDefinition

내보내기 대상 구성에 대한 자세한 내용은 지원되는 AWS 클라우드 대상의 구성 내보내기 단원을 참조하세요.

메시지 추가

내보내기를 위해 스트림 관리자로 데이터를 전송하려면 Greengrass 구성 요소가 대상 스트림에 데이터를 추가합니다. 내보내기 대상은 이 메소드에 전달할 데이터 유형을 결정합니다.

요구 사항

이 작업에는 다음과 같은 요구 사항이 있습니다.

  • 최소 스트림 관리자 SDK 버전: Python: 1.1.0  |  Java: 1.1.0  |  Node.js: 1.1.0

예시

AWS IoT Analytics 또는 Kinesis Data Streams 내보내기 대상

다음 코드 조각은 StreamName이라는 스트림에 메시지를 추가합니다. AWS IoT Analytics 또는 Kinesis Data Streams 대상의 경우 Greengrass 구성 요소가 데이터 블롭을 추가합니다.

이 코드 조각에는 다음과 같은 요구 사항이 있습니다.

  • 최소 스트림 관리자 SDK 버전: Python: 1.1.0  |  Java: 1.1.0  |  Node.js: 1.1.0

Python
client = StreamManagerClient() try: sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data') except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK 참조: append_message

Java
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes()); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK 참조: appendMessage

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array")); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 참조: appendMessage

AWS IoT SiteWise 내보내기 목적지

다음 코드 조각은 StreamName이라는 스트림에 메시지를 추가합니다. AWS IoT SiteWise 대상의 경우 Greengrass 구성 요소가 직렬화된 PutAssetPropertyValueEntry 객체를 추가합니다. 자세한 내용은 AWS IoT SiteWise로 내보내기 단원을 참조하십시오.

참고

AWS IoT SiteWise(으)로 데이터를 보낼 때 데이터는 BatchPutAssetPropertyValue 작업의 요구 사항을 충족해야 합니다. 자세한 내용은 AWS IoT SiteWise API 참조BatchPutAssetPropertyValue를 참조하세요.

이 코드 조각에는 다음과 같은 요구 사항이 있습니다.

  • 최소 스트림 관리자 SDK 버전: Python: 1.1.0  |  Java: 1.1.0  |  Node.js: 1.1.0

Python
client = StreamManagerClient() try: # SiteWise requires unique timestamps in all messages and also needs timestamps not earlier # than 10 minutes in the past. Add some randomness to time and offset. # Note: To create a new asset property data, you should use the classes defined in the # greengrasssdk.stream_manager module. time_in_nanos = TimeInNanos( time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) ) variant = Variant(double_value=random.random()) asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset) sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK 참조: append_message | PutAssetPropertyValueEntry

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { Random rand = new Random(); // Note: To create a new asset property data, you should use the classes defined in the // com.amazonaws.greengrass.streammanager.model.sitewise package. List<AssetPropertyValue> entries = new ArrayList<>() ; // IoTSiteWise requires unique timestamps in all messages and also needs timestamps not earlier // than 10 minutes in the past. Add some randomness to time and offset. final int maxTimeRandomness = 60; final int maxOffsetRandomness = 10000; double randomValue = rand.nextDouble(); TimeInNanos timestamp = new TimeInNanos() .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness)) .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness))); AssetPropertyValue entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); entries.add(entry); PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(UUID.randomUUID().toString()) .withPropertyAlias("PropertyAlias") .withPropertyValues(entries); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK 참조: appendMessage | PutAssetPropertyValueEntry

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const maxTimeRandomness = 60; const maxOffsetRandomness = 10000; const randomValue = Math.random(); // Note: To create a new asset property data, you should use the classes defined in the // aws-greengrass-core-sdk StreamManager module. const timestamp = new TimeInNanos() .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness)) .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness)); const entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(`${ENTRY_ID_PREFIX}${i}`) .withPropertyAlias("PropertyAlias") .withPropertyValues([entry]); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 참조: appendMessage | PutAssetPropertyValueEntry

Amazon S3 대상

다음 코드 조각은 StreamName이라는 스트림에 내보내기 작업을 추가합니다. Amazon S3 대상의 경우 Greengrass 구성 요소가 소스 입력 파일 및 대상 Amazon S3 객체에 대한 정보를 포함하는 직렬화된 S3ExportTaskDefinition 객체를 추가합니다. 지정된 객체가 없는 경우 Stream Manager가 자동으로 생성합니다. 자세한 내용은 Amazon S3로 내보내기 단원을 참조하십시오.

이 코드 조각에는 다음과 같은 요구 사항이 있습니다.

  • 최소 스트림 관리자 SDK 버전: Python: 1.1.0  |  Java: 1.1.0  |  Node.js: 1.1.0

Python
client = StreamManagerClient() try: # Append an Amazon S3 Task definition and print the sequence number. s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName") sequence_number = client.append_message(stream_name="StreamName", Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK 참조: append_message | S3ExportTaskDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { // Append an Amazon S3 export task definition and print the sequence number. S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition)); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK 참조: appendMessage | S3ExportTaskDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { // Append an Amazon S3 export task definition and print the sequence number. const taskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition))); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 참조: appendMessage | S3ExportTaskDefinition

메시지 읽기

스트림의 메시지 읽기

요구 사항

이 작업에는 다음과 같은 요구 사항이 있습니다.

  • 최소 스트림 관리자 SDK 버전: Python: 1.1.0  |  Java: 1.1.0  |  Node.js: 1.1.0

예시

다음 코드 조각은 StreamName이라는 스트림에서 메시지를 읽습니다. 읽기 메서드는 읽기를 시작할 시퀀스 번호, 읽기를 수행할 최소 및 최대 숫자, 메시지 읽기 시간 제한을 지정하는 선택적 ReadMessagesOptions 객체를 가져옵니다.

Python
client = StreamManagerClient() try: message_list = client.read_messages( stream_name="StreamName", # By default, if no options are specified, it tries to read one message from the beginning of the stream. options=ReadMessagesOptions( desired_start_sequence_number=100, # Try to read from sequence number 100 or greater. By default, this is 0. min_message_count=10, # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. max_message_count=100, # Accept up to 100 messages. By default this is 1. read_timeout_millis=5000 # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. ) ) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK 참조: read_messages | ReadMessagesOptions

Java
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { List<Message> messages = client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100L) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. .withMinMessageCount(10L) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100L) // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis()) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK 참조: readMessages | ReadMessagesOptions

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messages = await client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1. .withMinMessageCount(10) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100) // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(5 * 1000) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 참조: readMessages | ReadMessagesOptions

스트림 나열

스트림 매니저에서 스트림 목록을 가져옵니다.

요구 사항

이 작업에는 다음과 같은 요구 사항이 있습니다.

  • 최소 스트림 관리자 SDK 버전: Python: 1.1.0  |  Java: 1.1.0  |  Node.js: 1.1.0

예시

다음 코드 조각은 스트림 관리자에서 이름별로 스트림 목록을 가져옵니다.

Python
client = StreamManagerClient() try: stream_names = client.list_streams() except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK 참조: list_streams

Java
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { List<String> streamNames = client.listStreams(); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK 참조: listStreams

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const streams = await client.listStreams(); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 참조: listStreams

메시지 스트림 설명

스트림 정의, 크기, 내보내기 상태 등 스트림에 대한 메타데이터를 가져옵니다.

요구 사항

이 작업에는 다음과 같은 요구 사항이 있습니다.

  • 최소 스트림 관리자 SDK 버전: Python: 1.1.0  |  Java: 1.1.0  |  Node.js: 1.1.0

예시

다음 코드 조각은 스트림의 정의, 크기 및 내보내기 상태를 포함하여 StreamName라는 스트림에 대한 메타데이터를 가져옵니다.

Python
client = StreamManagerClient() try: stream_description = client.describe_message_stream(stream_name="StreamName") if stream_description.export_statuses[0].error_message: # The last export of export destination 0 failed with some error # Here is the last sequence number that was successfully exported stream_description.export_statuses[0].last_exported_sequence_number if (stream_description.storage_status.newest_sequence_number > stream_description.export_statuses[0].last_exported_sequence_number): pass # The end of the stream is ahead of the last exported sequence number except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK 참조: describe_message_stream

Java
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { MessageStreamInfo description = client.describeMessageStream("StreamName"); String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage(); if (lastErrorMessage != null && !lastErrorMessage.equals("")) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.getExportStatuses().get(0).getLastExportedSequenceNumber(); } if (description.getStorageStatus().getNewestSequenceNumber() > description.getExportStatuses().get(0).getLastExportedSequenceNumber()) { // The end of the stream is ahead of the last exported sequence number. } } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK 참조: describeMessageStream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const description = await client.describeMessageStream("StreamName"); const lastErrorMessage = description.exportStatuses[0].errorMessage; if (lastErrorMessage) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.exportStatuses[0].lastExportedSequenceNumber; } if (description.storageStatus.newestSequenceNumber > description.exportStatuses[0].lastExportedSequenceNumber) { // The end of the stream is ahead of the last exported sequence number. } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 참조: describeMessageStream

메시지 스트림 업데이트

기존 스트림의 속성을 업데이트합니다. 스트림이 생성된 후 요구 사항이 변경되면 스트림을 업데이트하는 것이 좋습니다. 예:

  • AWS 클라우드 대상에 새 내보내기 구성을 추가합니다.

  • 스트림의 최대 크기를 늘려 데이터를 내보내거나 유지하는 방법을 변경합니다. 예를 들어 스트림 크기와 전체 설정 전략을 함께 사용하면 스트림 관리자가 데이터를 처리하기 전에 데이터가 삭제되거나 거부될 수 있습니다.

  • 예를 들어, 내보내기 작업이 오래 걸리고 업로드 데이터를 할당하려는 경우 내보내기를 일시 중지했다가 다시 시작합니다.

Greengrass 구성 요소는 다음과 같은 간략한 프로세스를 따라 스트림을 업데이트합니다.

  1. 스트림에 대한 설명을 가져옵니다.

  2. 해당 MessageStreamDefinition 및 하위 객체의 대상 속성을 업데이트합니다.

  3. 업데이트된 MessageStreamDefinition을(를) 전달합니다. 업데이트된 스트림에 대한 전체 객체 정의를 포함해야 합니다. 정의되지 않은 속성은 기본값으로 되돌아갑니다.

    내보내기에서 시작 메시지로 사용할 메시지의 시퀀스 번호를 지정할 수 있습니다.

요구 사항

이 작업에는 다음과 같은 요구 사항이 있습니다.

  • 최소 스트림 관리자 SDK 버전: Python: 1.1.0  |  Java: 1.1.0  |  Node.js: 1.1.0

예시

다음 코드 조각은 StreamName(이)라는 스트림을 업데이트합니다. Kinesis Data Streams로 내보내는 스트림의 여러 속성을 업데이트합니다.

Python
client = StreamManagerClient() try: message_stream_info = client.describe_message_stream(STREAM_NAME) message_stream_info.definition.max_size=536870912 message_stream_info.definition.stream_segment_size=33554432 message_stream_info.definition.time_to_live_millis=3600000 message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData message_stream_info.definition.persistence=Persistence.Memory message_stream_info.definition.flush_on_write=False message_stream_info.definition.export_definition.kinesis= [KinesisConfig( # Updating Export definition to add a Kinesis Stream configuration. identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))] client.update_message_stream(message_stream_info.definition) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK 참조: updateMessageStream | MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME); // Update the message stream with new values. client.updateMessageStream( messageStreamInfo.getDefinition() .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912L) // Update Max Size to 512 MB. .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB. .withFlushOnWrite(true) // Update flush on write to true. .withPersistence(Persistence.Memory) // Update the persistence to Memory. .withTimeToLiveMillis(3600000L) // Update TTL to 1 hour. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS 클라우드. messageStreamInfo.getDefinition().getExportDefinition(). // Updating Export definition to add a Kinesis Stream configuration. .withKinesis(new ArrayList<KinesisConfig>() {{ add(new KinesisConfig() .withIdentifier(EXPORT_IDENTIFIER) .withKinesisStreamName("test")); }}) ); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK 참조: update_message_stream | MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messageStreamInfo = await c.describeMessageStream(STREAM_NAME); await client.updateMessageStream( messageStreamInfo.definition // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB. .withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB. .withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour. .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. .withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory .withFlushOnWrite(true) // Default is false. Updating to true. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS 클라우드. messageStreamInfo.definition.exportDefinition // Updating Export definition to add a Kinesis Stream configuration. .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())]) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 참조: updateMessageStream | MessageStreamDefinition

스트림 업데이트에 대한 제약조건

스트림을 업데이트할 때는 다음 제약조건이 적용됩니다. 다음 목록에 명시되어 있지 않는 한 업데이트는 즉시 적용됩니다.

  • 스트림의 지속성을 업데이트할 수 없습니다. 이 동작을 변경하려면 스트림을 삭제하고 새 지속성 정책을 정의하는 스트림을 생성합니다.

  • 다음과 같은 조건에서만 스트림의 최대 크기를 업데이트할 수 있습니다.

    • 최대 크기는 스트림의 현재 크기 이상이어야 합니다. 이 정보를 찾으려면 스트림을 설명하고 반환된 MessageStreamInfo 객체의 스토리지 상태를 확인합니다.

    • 최대 크기는 스트림의 세그먼트 크기 이상이어야 합니다.

  • 스트림 세그먼트 크기를 스트림의 최대 크기보다 작은 값으로 업데이트할 수 있습니다. 업데이트된 설정은 새 세그먼트에 적용됩니다.

  • Time to Live(TTL) 속성 업데이트는 새 추가 작업에 적용됩니다. 이 값을 줄이면 스트림 관리자가 TTL을 초과하는 기존 세그먼트도 삭제할 수 있습니다.

  • 전체 속성에 대한 전략 업데이트는 새 추가 작업에 적용됩니다. 가장 오래된 데이터를 덮어쓰도록 전략을 설정하는 경우 스트림 관리자가 새 설정에 따라 기존 세그먼트를 덮어쓸 수도 있습니다.

  • flush on write 속성 업데이트는 새 메시지에 적용됩니다.

  • 내보내기 구성 업데이트는 새 내보내기에 적용됩니다. 업데이트 요청에는 지원할 모든 내보내기 구성이 포함되어야 합니다. 그렇지 않으면 스트림 관리자가 해당 파일을 삭제합니다.

    • 내보내기 구성을 업데이트할 때 대상 내보내기 구성의 식별자를 지정합니다.

    • 내보내기 구성을 추가하려면 새 내보내기 구성의 고유 식별자를 지정합니다.

    • 내보내기 구성을 삭제하려면 내보내기 구성을 생략합니다.

  • 스트림에서 내보내기 구성의 시작 시퀀스 번호를 업데이트하려면 최신 시퀀스 번호보다 작은 값을 지정해야 합니다. 이 정보를 찾으려면 스트림을 설명하고 반환된 MessageStreamInfo 객체의 스토리지 상태를 확인합니다.

메시지 스트림 삭제

스트림을 삭제합니다. 스트림을 삭제하면 스트림에 저장된 모든 데이터가 디스크에서 삭제됩니다.

요구 사항

이 작업에는 다음과 같은 요구 사항이 있습니다.

  • 최소 스트림 관리자 SDK 버전: Python: 1.1.0  |  Java: 1.1.0  |  Node.js: 1.1.0

예시

다음 코드 조각은 StreamName라는 스트림을 삭제합니다.

Python
client = StreamManagerClient() try: client.delete_message_stream(stream_name="StreamName") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python SDK 참조: deleteMessageStream

Java
try (final StreamManagerClient client = StreamManagerClientFactory.standard().build()) { client.deleteMessageStream("StreamName"); } catch (StreamManagerException e) { // Properly handle exception. }

Java SDK 참조: delete_message_stream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.deleteMessageStream("StreamName"); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 참조: deleteMessageStream

다음 사항도 참조하세요.