Kinesis v1에서 v2로 마이그레이션 - Amazon Monitron

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Kinesis v1에서 v2로 마이그레이션

현재 v1 데이터 스키마를 사용하고 있다면 이미 Amazon S3로 데이터를 보내고 있거나 Lambda로 데이터 스트림 페이로드를 추가로 처리하고 있을 수 있습니다.

데이터 스키마를 v2로 업데이트

v1 스키마로 데이터 스트림을 이미 구성한 경우, 다음을 수행하여 데이터 내보내기 프로세스를 업데이트할 수 있습니다.

  1. Amazon Monitron 콘솔을 엽니다.

  2. 프로젝트로 이동합니다.

  3. 현재 라이브 데이터 내보내기를 중지합니다.

  4. 라이브 데이터 내보내기를 시작하여 새 데이터 스트림을 생성합니다.

  5. 새로 생성한 데이터 스트림을 선택합니다.

  6. 라이브 데이터 내보내기 시작을 선택합니다. 이때 새 스키마가 데이터 스트림을 통해 페이로드를 전송합니다.

  7. (선택 사항) Kinesis 콘솔로 이동하여 이전 데이터 스트림을 삭제합니다.

  8. v2 스키마를 사용하여 새로 생성한 데이터 스트림의 새 전송 방법을 구성하세요.

이제 새 스트림이 v2 스키마를 준수하는 페이로드를 새 버킷으로 전송합니다. 이러한 버킷의 모든 데이터를 처리하려는 경우 두 개의 서로 다른 버킷을 사용하여 일관된 형식을 유지하는 것이 좋습니다. 예를 들어, Athena 및 AWS Glue와 같은 다른 서비스를 사용합니다.

참고

Amazon S3로 데이터를 전송하는 경우, v2 스키마를 사용하여 Amazon S3에 데이터를 전송하는 방법에 대해 자세히 알아보려면 내보낸 데이터를 Amazon S3에 저장하는 방법을 알아보세요.

참고

Lambda 함수를 사용하여 페이로드를 처리했다면 Lambda로 데이터를 처리하는 방법을 알아보세요. Lambda로 업데이트 섹션에서 자세한 내용을 참조할 수도 있습니다.

Lambda를 사용한 데이터 처리 업데이트

Lambda로 데이터 처리를 업데이트하려면 v2 데이터 스트림이 이제 이벤트 기반이라는 점을 고려해야 합니다. 초기 v1 Lambda 코드는 다음과 비슷했을 수 있습니다.

import base64 def main_handler(event): # Kinesis "data" blob is base64 encoded so decode here: for record in event['Records']: payload = base64.b64decode(record["kinesis"]["data"]) measurement = payload["measurement"] projectDisplayName = payload["projectDisplayName"] # Process the content of the measurement # ...

v1 데이터 스키마가 지원 중단 경로에 있으므로 이전 Lambda 코드는 모든 새 데이터 스트림에서 작동하지 않습니다.

다음 Python 샘플 코드는 데이터 스키마 v2를 사용하여 Kinesis 스트림의 이벤트를 처리합니다. 이 코드는 새 eventType 파라미터를 사용하여 처리 방향을 적절한 핸들러로 지정합니다.

import base64 handlers = { "measurement": measurementEventHandler, "gatewayConnected": gatewayConnectedEventHandler, "gatewayDisconnected": gatewayDisconnectedEventHandler, "sensorConnected": sensorConnectedEventHandler, "sensorDisconnected": sensorDisconnectedEventHandler, } def main_handler(event): # Kinesis "data" blob is base64 encoded so decode here: for record in event['Records']: payload = base64.b64decode(record["kinesis"]["data"]) eventType = payload["eventType"] if eventType not in handler.keys(): log.info("No event handler found for the event type: {event['eventType']}") return # Invoke the appropriate handler based on the event type. eventPayload = payload["eventPayload"] eventHandler = handlers[eventType] eventHandler(eventPayload) def measurementEventHandler(measurementEventPayload): # Handle measurement event projectName = measurementEventPayload["projectName"] # ... def gatewayConnectedEventHandler(gatewayConnectedEventPayload): # Handle gateway connected event # Other event handler functions