StreamManagerClient를 사용하여 스트림 작업 - AWS IoT Greengrass

다음에 대한 설명서를 보고 있습니다.AWS IoT Greengrass Version 1.AWS IoT Greengrass Version 2의 최신 주요 버전입니다AWS IoT Greengrass. 사용에 관한 자세한 내용은 단원을 참조하십시오.AWS IoT Greengrass V2에 대한 자세한 내용은AWS IoT Greengrass Version 2개발자 안내서.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

StreamManagerClient를 사용하여 스트림 작업

에서 실행 중인 사용자 정의 Lambda 함수AWS IoT Greengrass코어를 사용할 수 있습니다StreamManagerClientAWS IoT Greengrass코어 SDK에서 스트림을 생성하기 위해스트림 관리자를 호출 한 다음 스트림과 상호 작용합니다. Lambda 함수가 스트림을 생성 할 때, 그것은 정의AWS 클라우드대상, 우선 순위, 기타 내보내기 및 데이터 보존 정책을 준수합니다. 스트림 관리자에 데이터를 전송하기 위해, Lambda 함수는 스트림에 데이터를 추가합니다. 스트림에 내보내기 대상이 정의된 경우 스트림 관리자는 스트림을 자동으로 내보냅니다.

참고

일반적으로 스트림 관리자의 클라이언트는 사용자 정의 Lambda 함수입니다. 비즈니스 사례에서 필요한 경우 Greengrass 코어 (예: Docker 컨테이너) 에서 실행되는 비 Lambda 프로세스가 스트림 관리자와 상호 작용하도록 허용할 수도 있습니다. 자세한 정보는 클라이언트 인증을 참조하십시오.

이 항목의 코드 조각은 클라이언트가StreamManagerClient메서드를 사용하여 스트림 작업을 수행합니다. 메서드 및 해당 인수에 대한 구현 세부 정보를 보려면 각 스 니펫 뒤에 나열된 SDK 참조에 대한 링크를 사용하십시오. 완전한 Python Lambda 함수를 포함하는 자습서는의 데이터 스트림 내보내기AWS 클라우드(콘솔)또는의 데이터 스트림 내보내기AWS 클라우드(CLI).

Lambda 함수는 인스턴스화해야합니다.StreamManagerClient함수 핸들러 외부에서. 핸들러에서 인스턴스화된 함수는 호출될 때마다 스트림 관리자에 대한 client 및 연결을 만듭니다.

참고

핸들러에서 StreamManagerClient를 인스턴스한 경우, client가 작업을 완료하면 사용자가 close() 메서드를 명시적으로 호출해야 합니다. 그렇지 않으면 client는 연결을 열어두고 스크립트가 종료될 때까지 다른 스레드를 실행합니다.

StreamManagerClient에서는 다음 작업을 지원합니다.

메시지 스트림 생성

스트림을 만들기 위해 사용자 정의 Lambda 함수는 create 메서드를 호출하고MessageStreamDefinition객체입니다. 이 객체는 스트림의 고유한 이름을 지정하고 최대 스트림 크기에 도달했을 때 스트림 관리자가 새 데이터를 처리하는 방법을 정의합니다. MessageStreamDefinition 및 해당 데이터 유형(예: ExportDefinition, StrategyOnFullPersistence)을 사용하여 다른 스트림 속성을 정의할 수 있습니다. 다음이 포함됩니다.

  • 대상AWS IoT Analytics, Kinesis Data Streams,AWS IoT SiteWise및 Amazon S3 대상에서 자동 내보내기를 수행할 수 있습니다. 자세한 정보는 지원되는 에 대한 구성 내보내기AWS 클라우드대상을 참조하십시오.

  • 내보내기 우선 순위. 스트림 관리자는 우선 순위가 낮은 스트림보다 우선 순위가 높은 스트림을 먼저 내보냅니다.

  • 최대 배치 크기 및 배치 간격AWS IoT Analytics, Kinesis Data Streams 및AWS IoT SiteWise대상. 스트림 관리자는 두 조건 중 하나가 충족되면 메시지를 내보냅니다.

  • TTL(Time-to-Live). 스트림 데이터를 처리에 사용할 수 있도록 보장하는 시간입니다. 이 기간 내에 데이터를 사용할 수 있는지 확인해야 합니다. 이는 삭제 정책이 아닙니다. TTL 기간 직후에는 데이터가 삭제되지 않을 수 있습니다.

  • 스트림 지속성. 스트림을 파일 시스템에 저장하여 코어 재시작 시 데이터를 유지하거나 메모리에 스트림을 저장하도록 선택합니다.

  • 시작 시퀀스 번호입니다. 내보내기에서 시작 메시지로 사용할 메시지의 시퀀스 번호를 지정합니다.

에 대한 자세한 내용MessageStreamDefinition자세한 내용은 대상 언어에 대한 SDK 참조를 참조하십시오.

참고

StreamManagerClient은 스트림을 HTTP 서버로 내보내는 데 사용할 수 있는 대상 대상을 제공합니다. 이 대상은 테스트 목적으로만 사용됩니다. 프로덕션 환경에서 사용하기 위해 안정적이지 않거나 지원되지 않습니다.

스트림이 생성되고 나면 Lambda 함수는메시지 추가를 스트림에 추가하여 내보낼 데이터를 보내고메시지 읽기로컬 처리를 위해 스트림에서. 생성하는 스트림 수는 하드웨어 기능 및 비즈니스 사례에 따라 다릅니다. 한 가지 전략은 각 대상 채널에 대한 스트림을AWS IoT Analytics또는 Kinesis 데이터 스트림의 경우 스트림에 대해 여러 개의 대상을 정의할 수 있습니다. 스트림은 안정적인 수명을 가지고 있습니다.

Requirements

이 작업에는 다음과 같은 요구 사항이 있습니다.

  • 최소AWS IoT Greengrass코어 버전 1.10.0

  • 최소AWS IoT Greengrass핵심 SDK 버전: Python: Java 1.5.0 Node.js 1.4.0 | 1.6.0

참고

와 스트림 만들기AWS IoT SiteWise또는 Amazon S3 내보내기 대상에는 다음과 같은 요구 사항이 있습니다.

  • 최소AWS IoT Greengrass코어 버전 1.11.0

  • 최소AWS IoT Greengrass핵심 SDK 버전: Python: Java 1.6.0 Node.js 1.5.0 | 1.7.0

Examples

다음 코드 조각은 StreamName이라는 스트림을 생성합니다. 그것은에서 스트림 속성을 정의MessageStreamDefinition및 종속 데이터 유형.

Python
client = StreamManagerClient() try: client.create_message_stream(MessageStreamDefinition( name="StreamName", # Required. max_size=268435456, # Default is 256 MB. stream_segment_size=16777216, # Default is 16 MB. time_to_live_millis=None, # By default, no TTL is enabled. strategy_on_full=StrategyOnFull.OverwriteOldestData, # Required. persistence=Persistence.File, # Default is File. flush_on_write=False, # Default is false. export_definition=ExportDefinition( # Optional. Choose where/how the stream is exported to the AWS 클라우드. kinesis=None, iot_analytics=None, iot_sitewise=None, s3=None ) )) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python용 SDKcreate_message_stream 생성|MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456L) // Default is 256 MB. .withStreamSegmentSize(16777216L) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS 클라우드. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSiteWise(null) .withS3(null) ) ); } catch (StreamManagerException e) { // Properly handle exception. }

SDK 참조:createMessageStream|MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.createMessageStream( new MessageStreamDefinition() .withName("StreamName") // Required. .withMaxSize(268435456) // Default is 256 MB. .withStreamSegmentSize(16777216) // Default is 16 MB. .withTimeToLiveMillis(null) // By default, no TTL is enabled. .withStrategyOnFull(StrategyOnFull.OverwriteOldestData) // Required. .withPersistence(Persistence.File) // Default is File. .withFlushOnWrite(false) // Default is false. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS 클라우드. new ExportDefinition() .withKinesis(null) .withIotAnalytics(null) .withIotSiteWise(null) .withS3(null) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 참조:createMessageStream|MessageStreamDefinition

내보내기 대상 구성에 대한 자세한 내용은 단원을 참조하십시오.지원되는 에 대한 구성 내보내기AWS 클라우드대상.

 

메시지 추가

스트림 관리자로 내보내기를 위해 Lambda 함수가 데이터를 대상 스트림에 추가합니다. 내보내기 대상은이 메서드에 전달할 데이터 형식을 결정합니다.

Requirements

이 작업에는 다음과 같은 요구 사항이 있습니다.

  • 최소AWS IoT Greengrass코어 버전 1.10.0

  • 최소AWS IoT Greengrass핵심 SDK 버전: Python: Java 1.5.0 Node.js 1.4.0 | 1.6.0

참고

와 함께 메시지 추가AWS IoT SiteWise또는 Amazon S3 내보내기 대상에는 다음과 같은 요구 사항이 있습니다.

  • 최소AWS IoT Greengrass코어 버전 1.11.0

  • 최소AWS IoT Greengrass핵심 SDK 버전: Python: Java 1.6.0 Node.js 1.5.0 | 1.7.0

Examples

AWS IoT Analytics또는 Kinesis Data Streams 내보내기 대상

다음 코드 조각은 StreamName이라는 스트림에 메시지를 추가합니다. 용AWS IoT Analytics또는 Kinesis Data Streams 대상에서 Lambda 함수는 데이터 Blob을 추가합니다.

이 스 니펫에는 다음과 같은 요구 사항이 있습니다.

  • 최소AWS IoT Greengrass코어 버전 1.10.0

  • 최소AWS IoT Greengrass핵심 SDK 버전: Python: Java 1.5.0 Node.js 1.4.0 | 1.6.0

Python
client = StreamManagerClient() try: sequence_number = client.append_message(stream_name="StreamName", data=b'Arbitrary bytes data') except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python용 SDKappend_message

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { long sequenceNumber = client.appendMessage("StreamName", "Arbitrary byte array".getBytes()); } catch (StreamManagerException e) { // Properly handle exception. }

SDK 참조:appendMessage

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const sequenceNumber = await client.appendMessage("StreamName", Buffer.from("Arbitrary byte array")); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 참조:appendMessage

AWS IoT SiteWise대상 내보내기

다음 코드 조각은 StreamName이라는 스트림에 메시지를 추가합니다. 용AWS IoT SiteWise대상에서 Lambda 함수는 직렬화 된PutAssetPropertyValueEntry객체입니다. 자세한 정보는 AWS IoT SiteWise로 내보내기을 참조하십시오.

참고

로 데이터를 전송하면AWS IoT SiteWise에 대한 요구 사항을 충족해야 합니다.BatchPutAssetPropertyValueaction. 자세한 내용은 단원을 참조하십시오.BatchPutAssetPropertyValueAWS IoT SiteWiseAPI 참조.

이 스 니펫에는 다음과 같은 요구 사항이 있습니다.

  • 최소AWS IoT Greengrass코어 버전 1.11.0

  • 최소AWS IoT Greengrass핵심 SDK 버전: Python: Java 1.6.0 Node.js 1.5.0 | 1.7.0

Python
client = StreamManagerClient() try: # SiteWise requires unique timestamps in all messages and also needs timestamps not earlier # than 10 minutes in the past. Add some randomness to time and offset. # Note: To create a new asset property data, you should use the classes defined in the # greengrasssdk.stream_manager module. time_in_nanos = TimeInNanos( time_in_seconds=calendar.timegm(time.gmtime()) - random.randint(0, 60), offset_in_nanos=random.randint(0, 10000) ) variant = Variant(double_value=random.random()) asset = [AssetPropertyValue(value=variant, quality=Quality.GOOD, timestamp=time_in_nanos)] putAssetPropertyValueEntry = PutAssetPropertyValueEntry(entry_id=str(uuid.uuid4()), property_alias="PropertyAlias", property_values=asset) sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(putAssetPropertyValueEntry)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python용 SDKappend_message|PutAssetPropertyValueEntry

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { Random rand = new Random(); // Note: To create a new asset property data, you should use the classes defined in the // com.amazonaws.greengrass.streammanager.model.sitewise package. List<AssetPropertyValue> entries = new ArrayList<>() ; // IoTSiteWise requires unique timestamps in all messages and also needs timestamps not earlier // than 10 minutes in the past. Add some randomness to time and offset. final int maxTimeRandomness = 60; final int maxOffsetRandomness = 10000; double randomValue = rand.nextDouble(); TimeInNanos timestamp = new TimeInNanos() .withTimeInSeconds(Instant.now().getEpochSecond() - rand.nextInt(maxTimeRandomness)) .withOffsetInNanos((long) (rand.nextInt(maxOffsetRandomness))); AssetPropertyValue entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); entries.add(entry); PutAssetPropertyValueEntry putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(UUID.randomUUID().toString()) .withPropertyAlias("PropertyAlias") .withPropertyValues(entries); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (StreamManagerException e) { // Properly handle exception. }

SDK 참조:appendMessage|PutAssetPropertyValueEntry

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const maxTimeRandomness = 60; const maxOffsetRandomness = 10000; const randomValue = Math.random(); // Note: To create a new asset property data, you should use the classes defined in the // aws-greengrass-core-sdk StreamManager module. const timestamp = new TimeInNanos() .withTimeInSeconds(Math.round(Date.now() / 1000) - Math.floor(Math.random() - maxTimeRandomness)) .withOffsetInNanos(Math.floor(Math.random() * maxOffsetRandomness)); const entry = new AssetPropertyValue() .withValue(new Variant().withDoubleValue(randomValue)) .withQuality(Quality.GOOD) .withTimestamp(timestamp); const putAssetPropertyValueEntry = new PutAssetPropertyValueEntry() .withEntryId(`${ENTRY_ID_PREFIX}${i}`) .withPropertyAlias("PropertyAlias") .withPropertyValues([entry]); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(putAssetPropertyValueEntry)); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 참조:appendMessage|PutAssetPropertyValueEntry

Amazon S3 수출 대상

다음 코드 조각은 이라는 스트림에 내보내기 작업을 추가합니다.StreamName. Amazon S3 대상의 경우 Lambda 함수는 직렬화된S3ExportTaskDefinition원본 입력 파일 및 대상 Amazon S3 객체에 대한 정보가 있는 객체입니다. 지정된 객체가 없으면 스트림 관리자가 자동으로 객체를 만듭니다. 자세한 정보는 Amazon S3 로 내보내기을 참조하십시오.

이 스 니펫에는 다음과 같은 요구 사항이 있습니다.

  • 최소AWS IoT Greengrass코어 버전 1.11.0

  • 최소AWS IoT Greengrass핵심 SDK 버전: Python: Java 1.6.0 Node.js 1.5.0 | 1.7.0

Python
client = StreamManagerClient() try: # Append an Amazon S3 Task definition and print the sequence number. s3_export_task_definition = S3ExportTaskDefinition(input_url="URLToFile", bucket="BucketName", key="KeyName") sequence_number = client.append_message(stream_name="StreamName", data=Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python용 SDKappend_message|S3내보내기추가 정의

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { // Append an Amazon S3 export task definition and print the sequence number. S3ExportTaskDefinition s3ExportTaskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); long sequenceNumber = client.appendMessage("StreamName", ValidateAndSerialize.validateAndSerializeToJsonBytes(s3ExportTaskDefinition)); } catch (StreamManagerException e) { // Properly handle exception. }

SDK 참조:appendMessage|S3내보내기추가 정의

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { // Append an Amazon S3 export task definition and print the sequence number. const taskDefinition = new S3ExportTaskDefinition() .withBucket("BucketName") .withKey("KeyName") .withInputUrl("URLToFile"); const sequenceNumber = await client.appendMessage("StreamName", util.validateAndSerializeToJsonBytes(taskDefinition))); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 참조:appendMessage|S3내보내기추가 정의

 

메시지 읽기

스트림에서 메시지를 읽습니다.

Requirements

이 작업에는 다음과 같은 요구 사항이 있습니다.

  • 최소AWS IoT Greengrass코어 버전 1.10.0

  • 최소AWS IoT Greengrass핵심 SDK 버전: Python: Java 1.5.0 Node.js 1.4.0 | 1.6.0

Examples

다음 코드 조각은 StreamName이라는 스트림에서 메시지를 읽습니다. 읽기 메서드는 읽기를 시작할 시퀀스 번호, 읽기를 수행할 최소 및 최대 숫자, 메시지 읽기 시간 제한을 지정하는 선택적 ReadMessagesOptions 객체를 가져옵니다.

Python
client = StreamManagerClient() try: message_list = client.read_messages( stream_name="StreamName", # By default, if no options are specified, it tries to read one message from the beginning of the stream. options=ReadMessagesOptions( desired_start_sequence_number=100, # Try to read from sequence number 100 or greater. By default, this is 0. min_message_count=10, # Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. max_message_count=100, # Accept up to 100 messages. By default this is 1. read_timeout_millis=5000 # Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. ) ) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python용 SDK읽기_메시지|ReadMessagesOptions

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<Message> messages = client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100L) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is raised. By default, this is 1. .withMinMessageCount(10L) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100L) // Try to wait at most 5 seconds for the min_messsage_count to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(Duration.ofSeconds(5L).toMillis()) ); } catch (StreamManagerException e) { // Properly handle exception. }

SDK 참조:readMessages|ReadMessagesOptions

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messages = await client.readMessages("StreamName", // By default, if no options are specified, it tries to read one message from the beginning of the stream. new ReadMessagesOptions() // Try to read from sequence number 100 or greater. By default this is 0. .withDesiredStartSequenceNumber(100) // Try to read 10 messages. If 10 messages are not available, then NotEnoughMessagesException is thrown. By default, this is 1. .withMinMessageCount(10) // Accept up to 100 messages. By default this is 1. .withMaxMessageCount(100) // Try to wait at most 5 seconds for the minMessageCount to be fulfilled. By default, this is 0, which immediately returns the messages or an exception. .withReadTimeoutMillis(5 * 1000) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 참조:readMessages|ReadMessagesOptions

 

스트림 나열

스트림 관리자에서 스트림 목록을 가져옵니다.

Requirements

이 작업에는 다음과 같은 요구 사항이 있습니다.

  • 최소AWS IoT Greengrass코어 버전: 1.10.0

  • 최소AWS IoT Greengrass핵심 SDK 버전: Python: Java 1.5.0 Node.js 1.4.0 | 1.6.0

Examples

다음 코드 조각은 스트림 관리자에서 이름별로 스트림 목록을 가져옵니다.

Python
client = StreamManagerClient() try: stream_names = client.list_streams() except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python용 SDKList_stream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { List<String> streamNames = client.listStreams(); } catch (StreamManagerException e) { // Properly handle exception. }

SDK 참조:listStreams

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const streams = await client.listStreams(); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 참조:listStreams

 

메시지 스트림 설명

스트림 정의, 크기 및 내보내기 상태를 포함하여 스트림에 대한 메타데이터를 가져옵니다.

Requirements

이 작업에는 다음과 같은 요구 사항이 있습니다.

  • 최소AWS IoT Greengrass코어 버전 1.10.0

  • 최소AWS IoT Greengrass핵심 SDK 버전: Python: Java 1.5.0 Node.js 1.4.0 | 1.6.0

Examples

다음 코드 조각은 스트림의 정의, 크기 및 내보내기 상태를 포함하여 StreamName라는 스트림에 대한 메타데이터를 가져옵니다.

Python
client = StreamManagerClient() try: stream_description = client.describe_message_stream(stream_name="StreamName") if stream_description.export_statuses[0].error_message: # The last export of export destination 0 failed with some error # Here is the last sequence number that was successfully exported stream_description.export_statuses[0].last_exported_sequence_number if (stream_description.storage_status.newest_sequence_number > stream_description.export_statuses[0].last_exported_sequence_number): pass # The end of the stream is ahead of the last exported sequence number except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python용 SDK스트림 설명_message_stream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo description = client.describeMessageStream("StreamName"); String lastErrorMessage = description.getExportStatuses().get(0).getErrorMessage(); if (lastErrorMessage != null && !lastErrorMessage.equals("")) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.getExportStatuses().get(0).getLastExportedSequenceNumber(); } if (description.getStorageStatus().getNewestSequenceNumber() > description.getExportStatuses().get(0).getLastExportedSequenceNumber()) { // The end of the stream is ahead of the last exported sequence number. } } catch (StreamManagerException e) { // Properly handle exception. }

SDK 참조:describeMessageStream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const description = await client.describeMessageStream("StreamName"); const lastErrorMessage = description.exportStatuses[0].errorMessage; if (lastErrorMessage) { // The last export of export destination 0 failed with some error. // Here is the last sequence number that was successfully exported. description.exportStatuses[0].lastExportedSequenceNumber; } if (description.storageStatus.newestSequenceNumber > description.exportStatuses[0].lastExportedSequenceNumber) { // The end of the stream is ahead of the last exported sequence number. } } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 참조:describeMessageStream

 

메시지 스트림 업데이트

기존 스트림의 속성을 업데이트합니다. 스트림이 생성된 후 요구 사항이 변경되면 스트림을 업데이트할 수 있습니다. 예:

  • 새 추가구성 내보내기에 대한AWS 클라우드대상.

  • 스트림의 최대 크기를 늘려 데이터를 내보내거나 보존하는 방법을 변경합니다. 예를 들어 스트림 크기를 전체 설정에 대한 전략과 함께 사용하면 스트림 관리자가 데이터를 처리하기 전에 데이터가 삭제되거나 거부될 수 있습니다.

  • 내보내기를 일시 중지했다가 다시 시작합니다. 예를 들어 내보내기 작업이 오래 실행되고 업로드 데이터를 배급하려는 경우

Lambda 함수는 다음과 같은 고급 프로세스를 따라 스트림을 업데이트합니다.

  1. 스트림에 대한 설명을 가져옵니다.

  2. 해당 대상 속성을 업데이트하십시오.MessageStreamDefinition및 종속 개체.

  3. 업데이트 된MessageStreamDefinition. 업데이트된 스트림에 대한 전체 객체 정의를 포함해야 합니다. 정의되지 않은 속성은 기본값으로 되돌리려면

    내보내기에서 시작 메시지로 사용할 메시지의 시퀀스 번호를 지정할 수 있습니다.

Requirements

이 작업에는 다음과 같은 요구 사항이 있습니다.

  • 최소AWS IoT Greengrass코어 버전: 1.11.0

  • 최소AWS IoT Greengrass핵심 SDK 버전: Python: Java 1.6.0 Node.js 1.5.0 | 1.7.0

Examples

다음 코드 조각은 라는 스트림을 업데이트합니다.StreamName. Kinesis Data Streams 스트림으로 내보내는 스트림의 여러 속성을 업데이트합니다.

Python
client = StreamManagerClient() try: message_stream_info = client.describe_message_stream(STREAM_NAME) message_stream_info.definition.max_size=536870912 message_stream_info.definition.stream_segment_size=33554432 message_stream_info.definition.time_to_live_millis=3600000 message_stream_info.definition.strategy_on_full=StrategyOnFull.RejectNewData message_stream_info.definition.persistence=Persistence.Memory message_stream_info.definition.flush_on_write=False message_stream_info.definition.export_definition.kinesis= [KinesisConfig( # Updating Export definition to add a Kinesis Stream configuration. identifier=str(uuid.uuid4()), kinesis_stream_name=str(uuid.uuid4()))] client.update_message_stream(message_stream_info.definition) except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python용 SDK업데이트메시지 스트림|MessageStreamDefinition

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { MessageStreamInfo messageStreamInfo = client.describeMessageStream(STREAM_NAME); // Update the message stream with new values. client.updateMessageStream( messageStreamInfo.getDefinition() .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912L) // Update Max Size to 512 MB. .withStreamSegmentSize(33554432L) // Update Segment Size to 32 MB. .withFlushOnWrite(true) // Update flush on write to true. .withPersistence(Persistence.Memory) // Update the persistence to Memory. .withTimeToLiveMillis(3600000L) // Update TTL to 1 hour. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS 클라우드. messageStreamInfo.getDefinition().getExportDefinition(). // Updating Export definition to add a Kinesis Stream configuration. .withKinesis(new ArrayList<KinesisConfig>() {{ add(new KinesisConfig() .withIdentifier(EXPORT_IDENTIFIER) .withKinesisStreamName("test")); }}) ); } catch (StreamManagerException e) { // Properly handle exception. }

SDK 참조:업데이트 메시지 스트림|MessageStreamDefinition

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { const messageStreamInfo = await c.describeMessageStream(STREAM_NAME); await client.updateMessageStream( messageStreamInfo.definition // Max Size update should be greater than initial Max Size defined in Create Message Stream request .withMaxSize(536870912) // Default is 256 MB. Updating Max Size to 512 MB. .withStreamSegmentSize(33554432) // Default is 16 MB. Updating Segment Size to 32 MB. .withTimeToLiveMillis(3600000) // By default, no TTL is enabled. Update TTL to 1 hour. .withStrategyOnFull(StrategyOnFull.RejectNewData) // Required. Updating Strategy on full to reject new data. .withPersistence(Persistence.Memory) // Default is File. Update the persistence to Memory .withFlushOnWrite(true) // Default is false. Updating to true. .withExportDefinition( // Optional. Choose where/how the stream is exported to the AWS 클라우드. messageStreamInfo.definition.exportDefinition // Updating Export definition to add a Kinesis Stream configuration. .withKinesis([new KinesisConfig().withIdentifier(uuidv4()).withKinesisStreamName(uuidv4())]) ) ); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 참조:업데이트메시지 스트림|MessageStreamDefinition

스트림 업데이트를 위한 제약 조건

스트림을 업데이트할 때 다음과 같은 제약이 적용됩니다. 다음 목록에 명시되지 않는 한 업데이트가 즉시 적용됩니다.

  • 스트림의 지속성을 업데이트할 수 없습니다. 이 동작을 변경하려면스트림 삭제스트림 생성새로운 지속성 정책을 정의합니다.

  • 스트림의 최대 크기는 다음 조건에서만 업데이트할 수 있습니다.

    • 최대 크기는 스트림의 현재 크기 이상이어야 합니다. 이 정보를 찾으려면스트림을 묘사한다의 저장 상태를 확인 한 다음 반환 된MessageStreamInfo객체입니다.

    • 최대 크기는 스트림의 세그먼트 크기 이상이어야 합니다.

  • 스트림 세그먼트 크기를 스트림의 최대 크기보다 작은 값으로 업데이트할 수 있습니다. 업데이트된 설정은 새 세그먼트에 적용됩니다.

  • TTL (Time To Live) 속성에 대한 업데이트는 새 추가 작업에 적용됩니다. 이 값을 줄이면 스트림 관리자가 TTL을 초과하는 기존 세그먼트도 삭제할 수 있습니다.

  • 전체 속성에 대한 전략에 대한 업데이트는 새 추가 작업에 적용됩니다. 가장 오래된 데이터를 덮어쓰도록 전략을 설정하면 스트림 관리자가 새 설정에 따라 기존 세그먼트를 덮어쓸 수도 있습니다.

  • 플러시 시 쓰기 속성에 대한 업데이트는 새 메시지에 적용됩니다.

  • 내보내기 구성에 대한 업데이트는 새 내보내기에 적용됩니다. 업데이트 요청에는 지원하려는 모든 내보내기 구성이 포함되어야 합니다. 그렇지 않으면 스트림 관리자가 삭제합니다.

    • 내보내기 구성을 업데이트할 때 대상 내보내기 구성의 식별자를 지정합니다.

    • 내보내기 구성을 추가하려면 새 내보내기 구성에 대한 고유 식별자를 지정합니다.

    • 내보내기 구성을 삭제하려면 내보내기 구성을 생략합니다.

  • 아래 번호로 일정하게 높입니다.update스트림에 있는 내보내기 구성의 시작 시퀀스 번호를 사용하려면 최신 시퀀스 번호보다 작은 값을 지정해야 합니다. 이 정보를 찾으려면스트림을 묘사한다의 저장 상태를 확인 한 다음 반환 된MessageStreamInfo객체입니다.

 

메시지 스트림 삭제

스트림을 삭제합니다. 스트림을 삭제하면 스트림에 저장된 모든 데이터가 디스크에서 삭제됩니다.

Requirements

이 작업에는 다음과 같은 요구 사항이 있습니다.

  • 최소AWS IoT Greengrass코어 버전: 1.10.0

  • 최소AWS IoT Greengrass핵심 SDK 버전: Python: Java 1.5.0 Node.js 1.4.0 | 1.6.0

Examples

다음 코드 조각은 StreamName라는 스트림을 삭제합니다.

Python
client = StreamManagerClient() try: client.delete_message_stream(stream_name="StreamName") except StreamManagerException: pass # Properly handle errors. except ConnectionError or asyncio.TimeoutError: pass # Properly handle errors.

Python용 SDKdeleteMessageStream

Java
try (final StreamManagerClient client = GreengrassClientBuilder.streamManagerClient().build()) { client.deleteMessageStream("StreamName"); } catch (StreamManagerException e) { // Properly handle exception. }

SDK 참조:delete_message_stream

Node.js
const client = new StreamManagerClient(); client.onConnected(async () => { try { await client.deleteMessageStream("StreamName"); } catch (e) { // Properly handle errors. } }); client.onError((err) => { // Properly handle connection errors. // This is called only when the connection to the StreamManager server fails. });

Node.js SDK 참조:deleteMessageStream

다음 사항도 참조하세요.