쿠키 기본 설정 선택

당사는 사이트와 서비스를 제공하는 데 필요한 필수 쿠키 및 유사한 도구를 사용합니다. 고객이 사이트를 어떻게 사용하는지 파악하고 개선할 수 있도록 성능 쿠키를 사용해 익명의 통계를 수집합니다. 필수 쿠키는 비활성화할 수 없지만 '사용자 지정' 또는 ‘거부’를 클릭하여 성능 쿠키를 거부할 수 있습니다.

사용자가 동의하는 경우 AWS와 승인된 제3자도 쿠키를 사용하여 유용한 사이트 기능을 제공하고, 사용자의 기본 설정을 기억하고, 관련 광고를 비롯한 관련 콘텐츠를 표시합니다. 필수가 아닌 모든 쿠키를 수락하거나 거부하려면 ‘수락’ 또는 ‘거부’를 클릭하세요. 더 자세한 내용을 선택하려면 ‘사용자 정의’를 클릭하세요.

AWS IoT Greengrass를 사용하여 IoT 데이터를 Amazon S3에 직접 비용 효율적으로 수집할 수 있습니다 - 권장 가이드

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

AWS IoT Greengrass를 사용하여 IoT 데이터를 Amazon S3에 직접 비용 효율적으로 수집할 수 있습니다

작성자: Sebastian Viviani(AWS), Rizwan Syed(AWS)

요약

이 패턴은 AWS IoT Greengrass 버전 2 디바이스를 사용하여 Amazon Simple Storage Service(S3) 버킷으로 직접 사물 인터넷(IoT) 데이터를 비용 효율적으로 수집하는 방법을 보여줍니다. 기기는 IoT 데이터를 읽고 영구 리포지토리(즉, 로컬 디스크 또는 볼륨)에 데이터를 저장하는 사용자 지정 구성 요소를 실행합니다. 그런 다음 디바이스는 IoT 데이터를 Apache Parquet 파일로 압축하고 주기적으로 데이터를 S3 버킷에 업로드합니다.

수집하는 IoT 데이터의 양과 속도는 엣지 하드웨어 기능과 네트워크 대역폭에 의해서만 제한됩니다. Amazon Athena를 사용하면 수집된 데이터를 비용 효율적으로 분석할 수 있습니다. Athena는 Amazon Managed Grafana를 사용하여 압축된 Apache Parquet 파일 및 데이터 시각화를 지원합니다.

사전 조건 및 제한 사항

사전 조건 

제한 사항

  • 이 패턴의 데이터는 S3 버킷에 실시간으로 업로드되지 않습니다. 지연 기간이 있으며 지연 기간을 구성할 수 있습니다. 데이터는 에지 디바이스에서 일시적으로 버퍼링된 후 기간이 만료되면 업로드됩니다.

  • SDK는 Java, Node.js 및 Python에서만 이용 가능합니다.

아키텍처

대상 기술 스택 

  • Amazon S3

  • IoT Greengrass

  • MQTT 브로커

  • 스트림 매니저 컴포넌트

대상 아키텍처

다음 다이어그램은 IoT 센서 데이터를 수집하고 해당 데이터를 S3 버킷에 저장하도록 설계된 아키텍처를 보여줍니다.

아키텍처 다이어그램

이 다이어그램은 다음 워크플로를 보여줍니다.

  1. 여러 센서(예: 온도 및 밸브) 업데이트가 로컬 MQTT 브로커에 게시됩니다.

  2. 이러한 센서를 구독하는 Parquet 파일 압축기는 주제를 업데이트하고 업데이트를 수신합니다.

  3. Parquet 파일 압축기는 업데이트를 로컬에 저장합니다.

  4. 기간이 경과하면 저장된 파일이 Parquet 파일로 압축되고 스트림 관리자로 전달되어 지정된 S3 버킷에 업로드됩니다.

  5. 스트림 관리자는 ParQuet 파일을 S3 버킷에 업로드합니다.

참고

스트림 관리자(StreamManager)는 관리형 구성 요소입니다. Amazon S3로 데이터를 내보내는 방법에 대한 예는 AWS IoT Greengrass 설명서의 스트림 관리자를 참조하세요. 로컬 MQTT 브로커를 구성 요소로 사용하거나 Eclipse Mosquitto와 같은 다른 브로커로 사용할 수 있습니다.

도구

AWS 도구

  • Amazon Athena는 표준 SQL을 사용하여 Amazon S3에 있는 데이터를 직접 분석할 수 있는 대화형 쿼리 서비스입니다.

  • Amazon Simple Storage Service(S3)는 원하는 양의 데이터를 저장, 보호 및 검색할 수 있는 클라우드 기반 객체 스토리지 서비스입니다.

  • AWS IoT Greengrass는 디바이스에서 IoT 애플리케이션을 구축, 배포 및 관리하는 데 도움이 되는 오픈 소스 IoT 엣지 런타임 및 클라우드 서비스입니다.

기타 도구

  • Apache Parquet는 스토리지 및 검색을 위해 설계된 오픈 소스 열 지향 데이터 파일 형식입니다.

  • MQTT(메시지 큐잉 텔레메트리 전송)는 제약이 있는 디바이스용으로 설계된 경량 메시징 프로토콜입니다.

모범 사례

업로드된 데이터에 적합한 파티션 형식 사용

S3 버킷의 루트 접두사 이름(예: "myAwesomeDataSet/" 또는"dataFromSource")에 대한 특정 요구 사항은 없지만 데이터 세트의 용도를 쉽게 이해할 수 있도록 의미 있는 파티션과 접두사를 사용하는 것이 좋습니다.

또한 쿼리가 데이터 세트에서 최적으로 실행되도록 Amazon S3에서 올바른 파티셔닝을 사용하는 것이 좋습니다. 다음 예제에서는 각 Athena 쿼리에서 스캔되는 데이터의 양이 최적화되도록 데이터를 HIVE 형식으로 분할합니다. 이렇게 하면 성능을 개선하고 비용을 절감할 수 있습니다.

s3://<ingestionBucket>/<rootPrefix>/year=YY/month=MM/day=DD/HHMM_<suffix>.parquet

에픽

작업설명필요한 기술

S3 버킷을 생성합니다.

  1. S3 버킷을 생성하거나 기존 버킷을 사용합니다.

  2. IoT 데이터를 수집하려는 S3 버킷에 대해 의미 있는 접두사를 생성합니다(예: s3:\\<bucket>\<prefix>).

  3. 나중에 사용할 수 있도록 접두사를 기록합니다.

앱 개발자

S3 버킷에 IAM 권한을 부여합니다.

이전에 생성한 S3 버킷 및 접두사에 대한 쓰기 액세스 권한을 사용자에게 부여하려면 AWS IoT Greengrass 역할에 다음 IAM 정책을 추가하세요.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3DataUpload", "Effect": "Allow", "Action": [ "s3:List*", "s3:Put*" ], "Resource": [ "arn:aws:s3:::<ingestionBucket>", "arn:aws:s3:::<ingestionBucket>/<prefix>/*" ] } ] }

자세한 내용은 Aurora 설명서의 Amazon S3 리소스에 액세스할 수 있는 IAM 정책 생성을 참조하세요.

다음으로 올바른 AWS 보안 주체로 쓰기 액세스를 허용하도록 S3 버킷의 리소스 정책(필요한 경우)을 업데이트하세요.

앱 개발자

환경을 설정합니다

작업설명필요한 기술

S3 버킷을 생성합니다.

  1. S3 버킷을 생성하거나 기존 버킷을 사용합니다.

  2. IoT 데이터를 수집하려는 S3 버킷에 대해 의미 있는 접두사를 생성합니다(예: s3:\\<bucket>\<prefix>).

  3. 나중에 사용할 수 있도록 접두사를 기록합니다.

앱 개발자

S3 버킷에 IAM 권한을 부여합니다.

이전에 생성한 S3 버킷 및 접두사에 대한 쓰기 액세스 권한을 사용자에게 부여하려면 AWS IoT Greengrass 역할에 다음 IAM 정책을 추가하세요.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3DataUpload", "Effect": "Allow", "Action": [ "s3:List*", "s3:Put*" ], "Resource": [ "arn:aws:s3:::<ingestionBucket>", "arn:aws:s3:::<ingestionBucket>/<prefix>/*" ] } ] }

자세한 내용은 Aurora 설명서의 Amazon S3 리소스에 액세스할 수 있는 IAM 정책 생성을 참조하세요.

다음으로 올바른 AWS 보안 주체로 쓰기 액세스를 허용하도록 S3 버킷의 리소스 정책(필요한 경우)을 업데이트하세요.

앱 개발자
작업설명필요한 기술

구성 요소의 레시피를 업데이트합니다.

다음 예제를 기반으로 배포를 생성할 때 구성 요소 구성을 업데이트합니다.

{ "region": "<region>", "parquet_period": <period>, "s3_bucket": "<s3Bucket>", "s3_key_prefix": "<s3prefix>" }

AWS 리전, 주기적 간격의 <region>, S3 버킷이 있는 <period>, 접두사가 있는 <s3Bucket>로 바꿉니다 <s3prefix>.

앱 개발자

구성 요소를 생성합니다.

다음 중 하나를 수행합니다.

  • 구성 요소를 생성합니다.

  • CI/CD 파이프라인(있는 경우)에 구성 요소를 추가합니다. 아티팩트 리포지토리의 아티팩트를 AWS IoT Greengrass 아티팩트 버킷으로 복사해야 합니다. 그런 다음 AWS IoT Greengrass 구성 요소를 생성하거나 업데이트합니다.

  • 참고

    MQTT 브로커를 구성 요소로 추가하거나 나중에 수동으로 추가하세요. :이 결정은 브로커와 함께 사용할 수 있는 인증 체계에 영향을 미칩니다. 브로커를 수동으로 추가하면 브로커를 AWS IoT Greengrass에서 분리하고 브로커의 지원되는 인증 체계를 사용할 수 있습니다. AWS에서 제공하는 브로커 구성 요소에는 사전 정의된 인증 체계가 있습니다. 자세한 내용은 MQTT 3.1.1 브로커(Moquette)MQTT 5 브로커(EMQX)를 참조하세요.

앱 개발자

MQTT 클라이언트를 업데이트합니다.

구성 요소가 브로커에 로컬로 연결되므로 샘플 코드는 인증을 사용하지 않습니다. 시나리오가 다를 경우 필요에 따라 MQTT 클라이언트 섹션을 업데이트하세요. 또한 다음 사항을 수행하세요.

  1. 구독에서 MQTT 주제를 업데이트합니다.

  2. 각 소스의 메시지가 다를 수 있으므로 필요에 따라 MQTT 메시지 파서를 업데이트합니다.

앱 개발자

AWS IoT Greengrass 구성 요소 구축 및 배포

작업설명필요한 기술

구성 요소의 레시피를 업데이트합니다.

다음 예제를 기반으로 배포를 생성할 때 구성 요소 구성을 업데이트합니다.

{ "region": "<region>", "parquet_period": <period>, "s3_bucket": "<s3Bucket>", "s3_key_prefix": "<s3prefix>" }

AWS 리전, 주기적 간격의 <region>, S3 버킷이 있는 <period>, 접두사가 있는 <s3Bucket>로 바꿉니다 <s3prefix>.

앱 개발자

구성 요소를 생성합니다.

다음 중 하나를 수행합니다.

  • 구성 요소를 생성합니다.

  • CI/CD 파이프라인(있는 경우)에 구성 요소를 추가합니다. 아티팩트 리포지토리의 아티팩트를 AWS IoT Greengrass 아티팩트 버킷으로 복사해야 합니다. 그런 다음 AWS IoT Greengrass 구성 요소를 생성하거나 업데이트합니다.

  • 참고

    MQTT 브로커를 구성 요소로 추가하거나 나중에 수동으로 추가하세요. :이 결정은 브로커와 함께 사용할 수 있는 인증 체계에 영향을 미칩니다. 브로커를 수동으로 추가하면 브로커를 AWS IoT Greengrass에서 분리하고 브로커의 지원되는 인증 체계를 사용할 수 있습니다. AWS에서 제공하는 브로커 구성 요소에는 사전 정의된 인증 체계가 있습니다. 자세한 내용은 MQTT 3.1.1 브로커(Moquette)MQTT 5 브로커(EMQX)를 참조하세요.

앱 개발자

MQTT 클라이언트를 업데이트합니다.

구성 요소가 브로커에 로컬로 연결되므로 샘플 코드는 인증을 사용하지 않습니다. 시나리오가 다를 경우 필요에 따라 MQTT 클라이언트 섹션을 업데이트하세요. 또한 다음 사항을 수행하세요.

  1. 구독에서 MQTT 주제를 업데이트합니다.

  2. 각 소스의 메시지가 다를 수 있으므로 필요에 따라 MQTT 메시지 파서를 업데이트합니다.

앱 개발자
작업설명필요한 기술

코어 디바이스 배포를 업데이트합니다.

AWS IoT Greengrass 버전 2 코어 디바이스의 배포가 이미 있는 경우 배포를 수정하세요. 배포가 존재하지 않는 경우 새 배포를 생성하세요.

구성 요소에 올바른 이름을 지정하려면 다음을 기반으로 새 구성 요소(필요한 경우)에 대한 로그 관리자 구성을 업데이트하세요.

{ "logsUploaderConfiguration": { "systemLogsConfiguration": { ... }, "componentLogsConfigurationMap": { "<com.iot.ingest.parquet>": { "minimumLogLevel": "INFO", "diskSpaceLimit": "20", "diskSpaceLimitUnit": "MB", "deleteLogFileAfterCloudUpload": "false" } ... } }, "periodicUploadIntervalSec": "300" }

마지막으로 AWS IoT Greengrass 코어 디바이스에 대한 배포 개정을 완료하세요.

앱 개발자

AWS IoT Greengrass 버전 2 코어 디바이스에 구성 요소 추가

작업설명필요한 기술

코어 디바이스 배포를 업데이트합니다.

AWS IoT Greengrass 버전 2 코어 디바이스의 배포가 이미 있는 경우 배포를 수정하세요. 배포가 존재하지 않는 경우 새 배포를 생성하세요.

구성 요소에 올바른 이름을 지정하려면 다음을 기반으로 새 구성 요소(필요한 경우)에 대한 로그 관리자 구성을 업데이트하세요.

{ "logsUploaderConfiguration": { "systemLogsConfiguration": { ... }, "componentLogsConfigurationMap": { "<com.iot.ingest.parquet>": { "minimumLogLevel": "INFO", "diskSpaceLimit": "20", "diskSpaceLimitUnit": "MB", "deleteLogFileAfterCloudUpload": "false" } ... } }, "periodicUploadIntervalSec": "300" }

마지막으로 AWS IoT Greengrass 코어 디바이스에 대한 배포 개정을 완료하세요.

앱 개발자
작업설명필요한 기술

AWS IoT Greengrass 볼륨에 대한 로그를 확인합니다.

다음 사항을 확인합니다.

  • MQTT 클라이언트가 로컬 MQTT 브로커에 성공적으로 연결되었습니다.

  • MQTT 클라이언트가 올바른 주제를 구독하고 있습니다.

  • MQTT 주제에 대한 센서 업데이트 메시지가 브로커에 전달됩니다.

  • 파케이 압축은 매 주기마다 발생합니다.

앱 개발자

S3 버킷을 선택합니다.

데이터가 S3 버킷에 업로드되고 있는지 검증합니다. 매 기간마다 업로드되는 파일을 확인할 수 있습니다.

다음 섹션에서 데이터를 쿼리하여 데이터가 S3 버킷에 업로드되었는지 확인할 수도 있습니다.

앱 개발자

S3 버킷으로의 데이터 모으기 검증

작업설명필요한 기술

AWS IoT Greengrass 볼륨에 대한 로그를 확인합니다.

다음 사항을 확인합니다.

  • MQTT 클라이언트가 로컬 MQTT 브로커에 성공적으로 연결되었습니다.

  • MQTT 클라이언트가 올바른 주제를 구독하고 있습니다.

  • MQTT 주제에 대한 센서 업데이트 메시지가 브로커에 전달됩니다.

  • 파케이 압축은 매 주기마다 발생합니다.

앱 개발자

S3 버킷을 선택합니다.

데이터가 S3 버킷에 업로드되고 있는지 검증합니다. 매 기간마다 업로드되는 파일을 확인할 수 있습니다.

다음 섹션에서 데이터를 쿼리하여 데이터가 S3 버킷에 업로드되었는지 확인할 수도 있습니다.

앱 개발자
작업설명필요한 기술

데이터베이스 및 테이블을 생성합니다.

  1. AWS Glue 데이터베이스를 생성합니다(필요한 경우).

  2. AWS Glue에서 수동으로 실행하거나 AWS Glue에서 크롤러를 실행하여 테이블을 생성합니다.

앱 개발자

Athena에게 데이터에 대한 액세스 권한을 부여합니다.

  1. Athena가 S3 버킷에 액세스할 수 있도록 권한을 업데이트합니다. 자세한 내용은 Athena 설명서에서 AWS Glue 데이터 카탈로그의 데이터베이스와 테이블에 대한 세분화된 액세스를 참조하세요.

  2. 데이터베이스의 테이블을 쿼리합니다.

앱 개발자

Athena에서 쿼리 설정

작업설명필요한 기술

데이터베이스 및 테이블을 생성합니다.

  1. AWS Glue 데이터베이스를 생성합니다(필요한 경우).

  2. AWS Glue에서 수동으로 실행하거나 AWS Glue에서 크롤러를 실행하여 테이블을 생성합니다.

앱 개발자

Athena에게 데이터에 대한 액세스 권한을 부여합니다.

  1. Athena가 S3 버킷에 액세스할 수 있도록 권한을 업데이트합니다. 자세한 내용은 Athena 설명서에서 AWS Glue 데이터 카탈로그의 데이터베이스와 테이블에 대한 세분화된 액세스를 참조하세요.

  2. 데이터베이스의 테이블을 쿼리합니다.

앱 개발자

문제 해결

문제Solution

MQTT 클라이언트 연결 실패

MQTT 클라이언트 구독 실패

MQTT 브로커의 권한을 검증합니다. AWS의 MQTT 브로커가 있는 경우 MQTT 3.1.1 브로커(Moquette)MQTT 5 브로커(EMQX)를 참조하세요.

파케이 파일은 생성되지 않습니다

  • MQTT 주제가 올바른지 검증합니다.

  • 센서의 MQTT 메시지 형식이 올바른지 검증합니다.

객체가 S3 버킷에 업로드되지 않았습니다.

  • 인터넷 연결 및 엔드포인트 연결이 있는지 검증합니다.

  • S3 버킷의 리소스 정책이 올바른지 검증합니다.

  • AWS IoT Greengrass 버전 2 코어 디바이스 역할에 대한 권한을 검증합니다.

관련 리소스

추가 정보

비용 분석

다음 비용 분석 시나리오는 이 패턴에서 다루는 데이터 모으기 접근 방식이 AWS 클라우드의 데이터 모으기 비용에 어떤 영향을 미칠 수 있는지를 보여줍니다. 이 시나리오의 요금 예시는 발행 시점의 가격을 기준으로 합니다. 요금은 변경될 수 있습니다. 또한 비용은 AWS 리전, AWS service quotas 및 클라우드 환경과 관련된 기타 요인에 따라 달라질 수 있습니다.

입력 신호 세트

이 분석에서는 다음과 같은 입력 신호 세트를 기반으로 IoT 수집 비용을 사용 가능한 다른 대안과 비교합니다.

신호 수

Frequency(주파수)

신호당 데이터

125

25Hz

8 bytes

이 시나리오에서 시스템은 125개의 신호를 수신합니다. 각 신호는 8바이트이며 40밀리초(25Hz)마다 발생합니다. 이러한 신호는 개별적으로 제공되거나 공통 페이로드에 그룹화될 수 있습니다. 필요에 따라 이러한 신호를 분리하고 패킹할 수 있습니다. 지연 시간도 확인할 수 있습니다. 지연 시간은 데이터 수신, 누적 및 수집 기간으로 구성됩니다.

비교를 위해 이 시나리오의 수집 작업은 us-east-1 AWS 리전을 기반으로 합니다. 비용 비교는 AWS 서비스에만 적용됩니다. 하드웨어 또는 연결과 같은 기타 비용은 분석에 포함되지 않습니다.

비용 비교

다음 표는 각 섭취 방법에 대한 월별 비용을 미국 달러(USD)로 보여줍니다.

방법

월별 비용

AWS IoT SiteWise*

331.77 USD

데이터 처리 팩이 포함된 AWS IoT SiteWise Edge(모든 데이터를 엣지에 보관)

200 USD

원시 데이터 액세스를 위한 AWS IoT Core 및 Amazon S3 규칙

84.54 USD

엣지에서의 파케이 파일 압축 및 Amazon S3에 업로드

0.5 USD

*Service Quotas를 준수하려면 데이터를 다운샘플링해야 합니다. 즉, 이 방법을 사용하면 데이터가 약간 손실될 수 있습니다.

대체 방법

이 섹션에서는 다음과 같은 대체 방법에 대한 해당 비용을 보여줍니다.

  • AWS IoT SiteWise - 각 신호는 개별 메시지로 업로드해야 합니다. 따라서 월별 총 메시지 수는 125×25×3600×24×30, 즉 월별 81억 개의 메시지입니다. 그러나 AWS IoT SiteWise는 속성당 초당 10개의 데이터 포인트만 처리할 수 있습니다. 데이터를 10Hz로 다운샘플링한다고 가정하면 월별 메시지 수는 125×10×3600×24×30, 즉 32억 4천만 개로 줄어듭니다. 측정값을 10개씩 그룹으로 묶는 퍼블리셔 구성 요소를 사용하면(메시지 백만 개당 1 USD) 월별 비용은 324 USD입니다. 각 메시지가 8바이트(1Kb/125)라고 가정하면 25.92Gb의 데이터 스토리지가 됩니다. 이로 인해 매월 7.77 USD의 월별 비용이 추가됩니다. 첫 달의 총 비용은 331.77 USD이며 매달 7.77 USD씩 증가합니다.

  • 엣지에서 완전히 처리된 모든 모델 및 신호(즉, 클라우드 통합 없음)를 포함하는 데이터 처리 팩이 포함된 AWS IoT SiteWise Edge - 데이터 처리 팩을 대안으로 사용하여 비용을 절감하고 엣지에서 계산되는 모든 모델을 구성할 수 있습니다. 이는 실제 계산이 수행되지 않더라도 스토리지 및 시각화에만 사용할 수 있습니다. 이 경우 엣지 게이트웨이에 강력한 하드웨어를 사용해야 합니다. 매월 200 USD의 고정 비용이 부과됩니다.

  • MQTT를 통한 AWS IoT Core로의 직접 수집 및 Amazon S3에 원시 데이터를 저장하는 IoT 규칙 - 모든 신호가 공통 페이로드에 게시된다고 가정하면 AWS IoT Core에 게시되는 총 메시지 수는 25×3600×24×30, 즉 매월 6,480만 개입니다. 메시지 백만 개당 1 USD로 계산하면 월별 비용은 64.8 USD입니다. 규칙 활성화 횟수 백만 회당 0.15 USD이고 메시지당 규칙 1개를 사용할 경우 월별 비용은 19.44 USD입니다. Amazon S3의 스토리지 GB당 0.023 USD의 비용이 발생하므로 매월 1.5 USD가 추가됩니다(새 데이터를 반영하기 위해 매월 증가). 첫 달의 총 비용은 84.54 USD이며 매달 1.5 USD씩 증가합니다.

  • Parquet 파일의 엣지에서 데이터를 압축하여 Amazon S3에 업로드(제안된 방법) - 압축률은 데이터 유형에 따라 다릅니다. MQTT에 대해 동일한 산업 데이터를 테스트한 결과, 한 달 동안의 총 출력 데이터는 1.2Gb입니다. 이 비용은 월 0.03 USD입니다. 다른 벤치마크에서 설명한 압축률(무작위 데이터 사용)은 66% 정도입니다(최악의 시나리오에 가까움). 총 데이터는 21Gb이고 비용은 월 0.5 USD입니다.

파케이 파일 생성기

다음 코드 예제는 Python으로 작성된 Parquet 파일 생성기의 구조를 보여줍니다. 코드 예제는 설명을 위한 용도로만 사용되며 사용자 환경에 붙여넣으면 작동하지 않습니다.

import queue import paho.mqtt.client as mqtt import pandas as pd #queue for decoupling the MQTT thread messageQueue = queue.Queue() client = mqtt.Client() streammanager = StreamManagerClient() def feederListener(topic, message): payload = { "topic" : topic, "payload" : message, } messageQueue.put_nowait(payload) def on_connect(client_instance, userdata, flags, rc): client.subscribe("#",qos=0) def on_message(client, userdata, message): feederListener(topic=str(message.topic), message=str(message.payload.decode("utf-8"))) filename = "tempfile.parquet" streamname = "mystream" destination_bucket= "amzn-s3-demo-bucket" keyname="mykey" period= 60 client.on_connect = on_connect client.on_message = on_message streammanager.create_message_stream( MessageStreamDefinition(name=streamname, strategy_on_full=StrategyOnFull.OverwriteOldestData) ) while True: try: message = messageQueue.get(timeout=myArgs.mqtt_timeout) except (queue.Empty): logger.warning("MQTT message reception timed out") currentTimestamp = getCurrentTime() if currentTimestamp >= nextUploadTimestamp: df = pd.DataFrame.from_dict(accumulator) df.to_parquet(filename) s3_export_task_definition = S3ExportTaskDefinition(input_url=filename, bucket=destination_bucket, key=key_name) streammanager.append_message(streamname, Util.validate_and_serialize_to_json_bytes(s3_export_task_definition)) accumulator = {} nextUploadTimestamp += period else: accumulator.append(message)
프라이버시사이트 이용 약관쿠키 기본 설정
© 2025, Amazon Web Services, Inc. 또는 계열사. All rights reserved.