데이터베이스 활동 스트림 모니터링 - Amazon Aurora

데이터베이스 활동 스트림 모니터링

데이터베이스 활동 스트림은 활동을 모니터링하고 보고합니다. 활동 스트림이 수집되어 Amazon Kinesis에 전송됩니다. Kinesis에서 활동 스트림을 모니터링하거나 다른 서비스 및 애플리케이션이 추가 분석을 위해 활동 스트림을 사용할 수 있습니다. AWS CLI 명령 describe-db-clusters 또는 RDS API DescribeDBClusters 작업을 사용하여 기본 Kinesis 스트림 이름을 찾을 수 있습니다.

Aurora는 다음과 같이 Kinesis 스트림을 관리합니다.

  • Aurora는 24시간 보존 기간으로 Kinesis 스트림을 자동으로 생성합니다.

  • Aurora는 필요한 경우 Kinesis 스트림 크기를 조정합니다.

  • 데이터베이스 활동 스트림을 중지하거나 DB 클러스터를 삭제하면 Aurora에서 Kinesis 스트림을 삭제합니다.

다음 활동 범주가 모니터링되고 활동 스트림 감사 로그에 기록됩니다.

  • SQL 명령 – 모든 SQL 명령이 감사되고 PL/SQL에서 준비된 문, 내장 함수 및 함수도 제공됩니다. 저장 프로시저에 대한 호출이 감사됩니다. 저장 프로시저 또는 함수 내에서 발급된 모든 SQL 문도 감사됩니다.

  • 다른 데이터베이스 정보 – 모니터링되는 활동에는 전체 SQL 문, DML 명령의 영향을 받은 행의 행 수, 액세스된 객체 및 고유한 데이터베이스 이름이 포함됩니다. Aurora PostgreSQL의 경우 데이터베이스 활동 스트림은 바인딩 변수 및 저장 프로시저 파라미터도 모니터링합니다.

    중요

    각 문의 전체 SQL 텍스트는 중요한 데이터를 포함하여 활동 스트림 감사 로그에 표시됩니다. 그러나 데이터베이스 사용자 암호는 다음 SQL 문에서와 같이 Aurora이 컨텍스트에서 판별할 수 있는 경우 수정됩니다.

    ALTER ROLE role-name WITH password
  • 연결 정보 – 모니터링되는 활동에는 세션 및 네트워크 정보, 서버 프로세스 ID 및 종료 코드가 포함됩니다.

DB 인스턴스를 모니터링하는 동안 활동 스트림에 오류가 발생하면 RDS 이벤트를 통해 사용자에게 알립니다.

Kinesis에서 활동 스트림에 액세스

DB 클러스터에 대해 활동 스트림을 활성화하면 Kinesis 스트림이 생성됩니다. Kinesis에서 데이터베이스 활동을 실시간으로 모니터링할 수 있습니다. 데이터베이스 활동을 추가 분석하려면 Kinesis 스트림을 소비자 애플리케이션에 연결하면 됩니다. 또한 IBM의 Security Guardium 또는 Imperva의 SecureSphere Database Audit and Protection과 같은 규정 준수 관리 애플리케이션에 스트림을 연결할 수 있습니다.

RDS 콘솔 또는 Kinesis 콘솔에서 Kinesis 스트림에 액세스할 수 있습니다.

RDS 콘솔을 사용하여 Kinesis에서 활동 스트림에 액세스하는 방법
  1. https://console.aws.amazon.com/rds/에서 Amazon RDS 콘솔을 엽니다.

  2. 탐색 창에서 데이터베이스를 선택합니다.

  3. 활동 스트림을 시작한 DB 클러스터를 선택합니다.

  4. Configuration(구성)을 선택합니다.

  5. 데이터베이스 활동 스트림에서 Kinesis 스트림 아래의 링크를 선택합니다.

  6. Kinesis 콘솔에서 모니터링을 선택하여 데이터베이스 활동 관찰을 시작합니다.

Kinesis 콘솔을 사용하여 Kinesis에서 활동 스트림에 액세스하는 방법
  1. https://console.aws.amazon.com/kinesis에서 Kinesis 콘솔을 엽니다.

  2. Kinesis 스트림 목록에서 활동 스트림을 선택합니다.

    활동 스트림의 이름에는 접두사 aws-rds-das-cluster-와 그 뒤의 DB 클러스터의 리소스 ID가 포함됩니다. 다음은 예제입니다.

    aws-rds-das-cluster-NHVOV4PCLWHGF52NP

    Amazon RDS 콘솔을 사용하여 DB 클러스터의 리소스 ID를 찾으려면 데이터베이스 목록에서 DB 클러스터를 선택한 다음 구성(Configuration) 탭을 선택합니다.

    AWS CLI를 사용하여 활동 스트림의 전체 Kinesis 스트림 이름을 찾으려면 describe-db-clusters 요청을 사용하고 응답에서 ActivityStreamKinesisStreamName 값을 기록합니다.

  3. 데이터베이스 활동을 관찰하려면 모니터링을 선택하십시오.

Amazon Kinesis 사용에 대한 자세한 내용은 Amazon Kinesis Data Streams이란 무엇입니까?를 참조하십시오.

감사 로그 내용 및 예

모니터링되는 이벤트는 데이터베이스 활동 스트림에 JSON 문자열로 표시됩니다. 구조는 DatabaseActivityMonitoringRecord를 포함하는 JSON 객체로 구성되며, 여기에는 databaseActivityEventList 활동 이벤트 배열이 포함됩니다.

활동 스트림 감사 로그 예제

다음은 활동 이벤트 레코드의 해독된 JSON 감사 로그 샘플입니다.

Aurora PostgreSQL CONNECT SQL 문 의 활동 이벤트 레코드

다음 활동 이벤트 레코드는 의 psql 클라이언트(clientApplication)에서 CONNECT SQL 문(command)을 사용하여 로그인한 것을 보여줍니다.

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-10-30 00:39:49.940668+00", "logTime": "2019-10-30 00:39:49.990579+00", "statementId": 1, "substatementId": 1, "objectType": null, "command": "CONNECT", "objectName": null, "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "49804", "sessionId": "5ce5f7f0.474b", "rowCount": null, "commandText": null, "paramList": [], "pid": 18251, "clientApplication": "psql", "exitCode": null, "class": "MISC", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }
예 Aurora MySQL CONNECT SQL 문의 활동 이벤트 레코드

다음 활동 이벤트 레코드는 mysql 클라이언트(clientApplication)가 CONNECT SQL 문(command)을 사용하여 로그인한 것을 보여줍니다.

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:13.267214+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"rdsadmin", "databaseName":"", "remoteHost":"localhost", "remotePort":"11053", "command":"CONNECT", "commandText":"", "paramList":null, "objectType":"TABLE", "objectName":"", "statementId":0, "substatementId":1, "exitCode":"0", "sessionId":"725121", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:13.267207+00", "endTime":"2020-05-22 18:07:13.267213+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }
Aurora PostgreSQL CREATE TABLE 문의 활동 이벤트 레코드

다음 예시는 Aurora PostgreSQL에 대한 CREATE TABLE 이벤트를 보여줍니다.

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-05-24 00:36:54.403455+00", "logTime": "2019-05-24 00:36:54.494235+00", "statementId": 2, "substatementId": 1, "objectType": null, "command": "CREATE TABLE", "objectName": null, "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "34534", "sessionId": "5ce73c6f.7e64", "rowCount": null, "commandText": "create table my_table (id serial primary key, name varchar(32));", "paramList": [], "pid": 32356, "clientApplication": "psql", "exitCode": null, "class": "DDL", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }
Aurora MySQL CREATE TABLE 문의 활동 이벤트 레코드

다음 예시는 Aurora MySQL의 CREATE TABLE 문을 보여줍니다. 이 작업은 두 개의 개별 이벤트 레코드로 표시됩니다. 한 이벤트에는 "class":"MAIN"이 있습니다. 다른 이벤트에는 "class":"AUX"가 있습니다. 메시지는 순서에 관계없이 도착할 수 있습니다. logTime 이벤트의 MAIN 필드는 항상 해당 logTime 이벤트의 AUX 필드보다 빠릅니다.

다음 예제에서는 class 값이 MAIN인 이벤트를 보여 줍니다.

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:12.250221+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"QUERY", "commandText":"CREATE TABLE test1 (id INT)", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65459278, "substatementId":1, "exitCode":"0", "sessionId":"725118", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:12.226384+00", "endTime":"2020-05-22 18:07:12.250222+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }

다음 예제에서는 class 값이 AUX인 해당 이벤트를 보여 줍니다.

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:12.247182+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"CREATE", "commandText":"test1", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65459278, "substatementId":2, "exitCode":"", "sessionId":"725118", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:12.226384+00", "endTime":"2020-05-22 18:07:12.247182+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"AUX" } ] }
Aurora PostgreSQL SELECT 문의 활동 이벤트 레코드

다음 예시는 에 대한 SELECT 이벤트를 보여줍니다.

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-05-24 00:39:49.920564+00", "logTime": "2019-05-24 00:39:49.940668+00", "statementId": 6, "substatementId": 1, "objectType": "TABLE", "command": "SELECT", "objectName": "public.my_table", "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "34534", "sessionId": "5ce73c6f.7e64", "rowCount": 10, "commandText": "select * from my_table;", "paramList": [], "pid": 32356, "clientApplication": "psql", "exitCode": null, "class": "READ", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }
{ "type": "DatabaseActivityMonitoringRecord", "clusterId": "", "instanceId": "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q", "databaseActivityEventList": [ { "class": "TABLE", "clientApplication": "Microsoft SQL Server Management Studio - Query", "command": "SELECT", "commandText": "select * from [testDB].[dbo].[TestTable]", "databaseName": "testDB", "dbProtocol": "SQLSERVER", "dbUserName": "test", "endTime": null, "errorMessage": null, "exitCode": 1, "logTime": "2022-10-06 21:24:59.9422268+00", "netProtocol": null, "objectName": "TestTable", "objectType": "TABLE", "paramList": null, "pid": null, "remoteHost": "local machine", "remotePort": null, "rowCount": 0, "serverHost": "172.31.30.159", "serverType": "SQLSERVER", "serverVersion": "15.00.4073.23.v1.R1", "serviceName": "sqlserver-ee", "sessionId": 62, "startTime": null, "statementId": "0x03baed90412f564fad640ebe51f89b99", "substatementId": 1, "transactionId": "4532935", "type": "record", "engineNativeAuditFields": { "target_database_principal_id": 0, "target_server_principal_id": 0, "target_database_principal_name": "", "server_principal_id": 2, "user_defined_information": "", "response_rows": 0, "database_principal_name": "dbo", "target_server_principal_name": "", "schema_name": "dbo", "is_column_permission": true, "object_id": 581577110, "server_instance_name": "EC2AMAZ-NFUJJNO", "target_server_principal_sid": null, "additional_information": "", "duration_milliseconds": 0, "permission_bitmask": "0x00000000000000000000000000000001", "data_sensitivity_information": "", "session_server_principal_name": "test", "connection_id": "AD3A5084-FB83-45C1-8334-E923459A8109", "audit_schema_version": 1, "database_principal_id": 1, "server_principal_sid": "0x010500000000000515000000bdc2795e2d0717901ba6998cf4010000", "user_defined_event_id": 0, "host_name": "EC2AMAZ-NFUJJNO" } } ] }
예 Aurora MySQL SELECT 문의 활동 이벤트 레코드

다음 예시는 SELECT 이벤트를 보여줍니다.

다음 예제에서는 class 값이 MAIN인 이벤트를 보여 줍니다.

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:29:57.986467+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"QUERY", "commandText":"SELECT * FROM test1 WHERE id < 28", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65469218, "substatementId":1, "exitCode":"0", "sessionId":"726571", "rowCount":2, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:29:57.986364+00", "endTime":"2020-05-22 18:29:57.986467+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }

다음 예제에서는 class 값이 AUX인 해당 이벤트를 보여 줍니다.

{ "type":"DatabaseActivityMonitoringRecord", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:29:57.986399+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"READ", "commandText":"test1", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65469218, "substatementId":2, "exitCode":"", "sessionId":"726571", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:29:57.986364+00", "endTime":"2020-05-22 18:29:57.986399+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"AUX" } ] }

DatabaseActivityMonitoringRecords JSON 객체

데이터베이스 작업 이벤트 레코드는 다음 정보가 포함된 JSON 객체에 있습니다.

JSON 필드 데이터 형식 설명

type

string

JSON 레코드 형식입니다. 이 값은 DatabaseActivityMonitoringRecords입니다.

version string 데이터베이스 작업 모니터링 레코드의 버전입니다.

생성되는 데이터베이스 활동 레코드의 버전은 DB 클러스터의 엔진 버전에 따라 다릅니다.

  • 버전 1.1 데이터베이스 작업 레코드는 엔진 버전 10.10 이상 마이너 버전 및 엔진 버전 11.5 이상을 실행하는 Aurora PostgreSQL DB 클러스터에 대해 생성됩니다.

  • 버전 1.0 데이터베이스 작업 레코드는 엔진 버전 10.7 및 11.4를 실행하는 Aurora PostgreSQL DB 클러스터에 대해 생성됩니다.

달리 명시된 경우를 제외하고 다음 필드는 모두 버전 1.0과 버전 1.1에 있습니다.

databaseActivityEvents

문자열

작업 이벤트를 포함하는 JSON 객체입니다.

문자열 databaseActivityEventList를 해독하는 데 사용되는 암호화 키

databaseActivityEvents JSON 객체

databaseActivityEvents JSON 객체에는 다음과 같은 정보가 포함되어 있습니다.

JSON 레코드의 최상위 필드

감사 로그의 각 이벤트는 JSON 형식의 레코드 내에 래핑됩니다. 이 레코드에는 다음 필드가 포함되어 있습니다.

type

이 필드는 항상 값이 DatabaseActivityMonitoringRecords입니다.

version

이 필드는 데이터베이스 활동 스트림 데이터 프로토콜 또는 계약 버전을 나타냅니다. 이는 사용 가능한 필드를 정의합니다.

버전 1.0은 Aurora PostgreSQL 버전 10.7 및 11.4에 대한 원래 데이터 활동 스트림 지원을 나타냅니다. 버전 1.1은 Aurora PostgreSQL 버전 10.10 이상 및 Aurora PostgreSQL 11.5 이상에 대한 데이터 활동 스트림 지원을 나타냅니다. 버전 1.1에는 추가 필드 errorMessagestartTime이 포함되어 있습니다. 버전 1.2는 Aurora MySQL 2.08 이상에 대한 데이터 활동 스트림 지원을 나타냅니다. 버전 1.2에는 추가 필드 endTimetransactionId가 포함되어 있습니다.

databaseActivityEvents

하나 이상의 활동 이벤트를 나타내는 암호화된 문자열입니다. base64 바이트 배열로 표현됩니다. 문자열을 해독하면 결과는 이 단원의 예제와 같이 필드가 있는 JSON 형식의 레코드입니다.

databaseActivityEvents 문자열을 암호화하는 데 사용되는 암호화된 데이터 키입니다. 이 키는 데이터베이스 활동 스트림을 시작할 때 제공한 AWS KMS key와(과) 동일합니다.

다음 예제에서는 이 레코드의 형식을 보여줍니다.

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents":"encrypted audit records", "key":"encrypted key" }

databaseActivityEvents 필드의 내용을 해독하려면 다음 단계를 수행합니다.

  1. 데이터베이스 활동 스트림을 시작할 때 제공한 키를 사용하여 key JSON 필드의 값을 복호화합니다. 이렇게 하면 데이터 암호화 키가 일반 텍스트로 반환됩니다.

  2. Base64로 databaseActivityEvents JSON 필드의 값을 디코딩하여 감사 페이로드의 암호화 텍스트를 이진 형식으로 가져옵니다.

  3. 첫 번째 단계에서 디코딩한 데이터 암호화 키를 사용하여 이진 암호화 텍스트를 해독합니다.

  4. 해독된 페이로드의 압축을 풉니다.

    • 암호화된 페이로드가 databaseActivityEvents 필드에 있습니다.

    • databaseActivityEventList 필드에는 감사 레코드 배열이 포함되어 있습니다. 배열의 type 필드는 record 또는 heartbeat일 수 있습니다.

감사 로그 활동 이벤트 레코드는 다음 정보가 포함된 JSON 객체입니다.

JSON 필드 데이터 형식 설명

type

string

JSON 레코드 형식입니다. 이 값은 DatabaseActivityMonitoringRecord입니다.

clusterId string DB 클러스터 리소스 식별자입니다. DB 클러스터 속성 DbClusterResourceId에 해당합니다.
instanceId string DB 인스턴스 리소스 식별자입니다. DB 인스턴스 속성 DbiResourceId에 해당합니다.

databaseActivityEventList

string

활동 감사 레코드 또는 하트비트 메시지의 배열입니다.

databaseActivityEventList JSON 배열

감사 로그 페이로드는 암호화된 databaseActivityEventList JSON 배열입니다. 다음 에는 감사 로그의 복호화된 DatabaseActivityEventList 배열에 있는 각 활동 이벤트의 필드가 알파벳 순으로 나열되어 있습니다. 이 필드는 Aurora PostgreSQL을 사용하는지 Aurora MySQL을 사용하는지에 따라 다릅니다. 해당 데이터베이스 엔진에 적용되는 표를 참조하십시오.

중요

이벤트 구조는 변경될 수 있습니다. Aurora는 향후 활동 이벤트에 새 필드를 추가할 수 있습니다. JSON 데이터를 구문 분석하는 애플리케이션에서 코드는 알 수 없는 필드 이름에 대해 무시할 수 있는지 또는 적절한 작업을 수행할 수 있는지 확인합니다.

Aurora PostgreSQL의 databaseActivityEventList 필드
필드 데이터 형식 설명
class string

활동 이벤트의 클래스입니다. Aurora PostgreSQL에 유효한 값은 다음과 같습니다.

  • ALL

  • CONNECT – 연결 또는 연결 해제 이벤트.

  • DDLROLE 클래스의 문 목록에 포함되지 않은 DDL 문.

  • FUNCTION – 함수 호출 또는 DO 블록.

  • MISCDISCARD, FETCH, CHECKPOINT 또는 VACUUM 같은 기타 명령

  • NONE

  • READ – 소스가 관계 또는 쿼리인 경우 SELECT 또는 COPY 문입니다.

  • ROLEGRANT, REVOKE, CREATE/ALTER/DROP ROLE을 포함한 역할 및 권한과 관련된 문.

  • WRITE – 대상이 관계인 경우 INSERT, UPDATE, DELETE, TRUNCATE 또는 COPY 문.

clientApplication string 클라이언트가 보고한 대로 클라이언트가 연결에 사용한 애플리케이션입니다. 클라이언트는 이 정보를 제공할 필요가 없으므로 값은 null일 수 있습니다.
command string 명령 세부 정보가 없는 SQL 명령의 이름.
commandText string

사용자가 전달한 실제 SQL 문. Aurora PostgreSQL의 경우 값은 원래 SQL 문과 동일합니다. 이 필드는 연결 또는 연결 해제 레코드를 제외한 모든 유형의 레코드에 사용되며 이 경우 값은 null입니다.

중요

각 문의 전체 SQL 텍스트는 중요한 데이터를 포함하여 활동 스트림 감사 로그에 표시됩니다. 그러나 데이터베이스 사용자 암호는 다음 SQL 문에서와 같이 Aurora가 컨텍스트에서 판별할 수 있는 경우 수정됩니다.

ALTER ROLE role-name WITH password
databaseName string 사용자가 연결된 데이터베이스입니다.
dbProtocol string 데이터베이스 프로토콜입니다(예: Postgres 3.0).
dbUserName string 클라이언트가 인증한 데이터베이스 사용자.
errorMessage

(버전 1.1 데이터베이스 작업 레코드만 해당)

string

오류가 발생할 경우 DB 서버에서 생성된 오류 메시지로 이 필드가 채워집니다. 오류가 발생하지 않은 정상적인 문의 경우 errorMessage 값은 null입니다.

오류는 클라이언트에 표시되는 심각도 수준 ERROR 이상의 PostgreSQL 오류 로그 이벤트를 생성하는 모든 활동으로 정의됩니다. 자세한 내용은 PostgreSQL 메시지 심각도 수준을 참조하십시오. 예를 들어, 구문 오류 및 쿼리 취소는 오류 메시지를 생성합니다.

백그라운드 체크포인터 프로세스 오류와 같은 내부 PostgreSQL 서버 오류는 오류 메시지를 생성하지 않습니다. 그러나 이러한 이벤트에 대한 레코드는 로그 심각도 수준의 설정에 관계없이 생성됩니다. 이로 인해 공격자가 로깅을 해제하여 탐지를 방지할 수 없습니다.

exitCode 필드도 참조하십시오.

exitCode int 세션 종료 레코드에 사용되는 값. 정리 종료의 경우, 여기에 종료 코드가 포함됩니다. 일부 결함 시나리오에서는 종료 코드를 항상 얻을 수 있는 것은 아닙니다. PostgreSQL이 exit()을 수행하거나 연산자가 kill -9 같은 명령을 수행하는 경우가 그 예입니다.

오류가 발생할 경우 PostgreSQL 오류 코드에 나열된 SQL 오류 코드 SQLSTATEexitCode 필드에 표시됩니다.

errorMessage 필드도 참조하십시오.

logTime string 감사 코드 경로에 기록된 타임스탬프. SQL 문의 실행이 종료된 시간을 나타냅니다. startTime 필드도 참조하십시오.
netProtocol string 네트워크 통신 프로토콜.
objectName string SQL 문이 하나에서 작동하는 경우 데이터베이스 객체의 이름. 이 필드는 SQL 문이 데이터베이스 객체에서 작동하는 경우에만 사용됩니다. SQL 문이 객체에서 작동하지 않는 경우, 이 값은 null입니다.
objectType string 테이블, 인덱스, 뷰 등의 데이터베이스 객체 유형입니다. 이 필드는 SQL 문이 데이터베이스 객체에서 작동하는 경우에만 사용됩니다. SQL 문이 객체에서 작동하지 않는 경우, 이 값은 null입니다. 유효한 값은 다음과 같습니다.
  • COMPOSITE TYPE

  • FOREIGN TABLE

  • FUNCTION

  • INDEX

  • MATERIALIZED VIEW

  • SEQUENCE

  • TABLE

  • TOAST TABLE

  • VIEW

  • UNKNOWN

paramList string SQL 문에 전달되는 쉼표로 구분된 파라미터의 배열입니다. SQL 문에 파라미터가 없는 경우 이 값은 빈 배열입니다.
pid int 클라이언트 연결 서비스를 위해 할당된 백엔드 프로세스의 프로세스 ID입니다.
remoteHost string 클라이언트 IP 주소 또는 호스트 이름입니다. Aurora PostgreSQL의 경우 데이터베이스의 log_hostname 파라미터 설정에 따라 사용되는 값이 달라집니다.
remotePort string 클라이언트 포트 번호.
rowCount int SQL 문에 의해 반환되는 행의 수입니다. 예를 들어 SELECT 문에서 10개의 행을 반환하는 경우 rowCount는 10입니다. INSERT 또는 UPDATE 문의 경우 rowCount는 0입니다.
serverHost string 데이터베이스 서버 호스트 IP 주소.
serverType string 데이터베이스 서버 유형(예: PostgreSQL).
serverVersion string 데이터베이스 서버 버전(예: Aurora PostgreSQL의 경우 2.3.1).
serviceName string 서비스의 이름(예: Amazon Aurora PostgreSQL-Compatible edition).
sessionId int 의사 고유 세션 식별자.
sessionId int 의사 고유 세션 식별자.
startTime

(버전 1.1 데이터베이스 작업 레코드만 해당)

string

SQL 문의 실행이 시작된 시간입니다.

SQL 문의 대략적인 실행 시간을 계산하려면 logTime - startTime을 사용합니다. logTime 필드도 참조하십시오.

statementId int 클라이언트의 SQL 문에 대한 식별자입니다. 카운터는 세션 수준에 있으며 클라이언트가 입력한 각 SQL 문 단위로 증가합니다.
substatementId int SQL 하위 문에 대한 식별자입니다. 이 값은 statementId 필드로 식별되는 각 SQL 문에 대해 포함된 하위 명령문을 계산합니다.
type string 이벤트 유형입니다. 유효한 값은 record 또는 heartbeat입니다.
Aurora MySQL의 databaseActivityEventList 필드
필드 데이터 형식 설명
class string

활동 이벤트의 클래스입니다.

Aurora MySQL에 유효한 값은 다음과 같습니다.

  • MAIN – SQL 문을 나타내는 기본 이벤트입니다.

  • AUX – 추가 세부 정보를 포함하는 보충 이벤트입니다. 예를 들어 객체의 이름을 바꾸는 문에는 새 이름을 반영하는 AUX 클래스가 포함된 이벤트가 있을 수 있습니다.

    동일한 문에 해당하는 MAINAUX 이벤트를 찾으려면 pid 필드와 statementId 필드에 대해 동일한 값을 가진 다른 이벤트를 확인합니다.

clientApplication string 클라이언트가 보고한 대로 클라이언트가 연결에 사용한 애플리케이션입니다. 클라이언트는 이 정보를 제공할 필요가 없으므로 값은 null일 수 있습니다.
command string

SQL 문의 일반 범주입니다. 이 필드의 값은 class 값에 따라 달라집니다.

classMAIN인 경우 값에는 다음이 포함됩니다.

  • CONNECT – 클라이언트 세션이 연결된 경우.

  • QUERY – SQL 문. class 값이 AUX인 하나 이상의 이벤트가 포함됩니다.

  • DISCONNECT – 클라이언트 세션의 연결이 끊어진 경우.

  • FAILED_CONNECT – 클라이언트가 연결을 시도하지만 연결할 수 없는 경우.

  • CHANGEUSER – MySQL 네트워크 프로토콜에 속하는 상태 변경으로, 사용자가 실행한 문이 아닌 경우.

classAUX인 경우 값에는 다음이 포함됩니다.

  • READ – 소스가 관계 또는 쿼리인 경우 SELECT 또는 COPY 문입니다.

  • WRITE – 대상이 관계인 경우 INSERT, UPDATE, DELETE, TRUNCATE 또는 COPY 문.

  • DROP – 객체 삭제.

  • CREATE – 객체 생성.

  • RENAME – 객체 이름 바꾸기.

  • ALTER – 객체의 속성 변경.

commandText string

class 값이 MAIN인 이벤트의 경우 이 필드는 사용자가 전달한 실제 SQL 문을 나타냅니다. 이 필드는 연결 또는 연결 해제 레코드를 제외한 모든 유형의 레코드에 사용되며 이 경우 값은 null입니다.

class 값이 AUX인 이벤트의 경우 이 필드에는 이벤트와 관련된 객체에 대한 보충 정보가 포함됩니다.

Aurora MySQL의 경우 따옴표와 같은 문자 앞에는 이스케이프 문자를 나타내는 백슬래시가 옵니다.

중요

각 문의 전체 SQL 텍스트는 중요한 데이터를 포함하여 감사 로그에 표시됩니다. 그러나 데이터베이스 사용자 암호는 다음 SQL 문에서와 같이 Aurora가 컨텍스트에서 판별할 수 있는 경우 수정됩니다.

mysql> SET PASSWORD = 'my-password';
참고

보안 모범 사례로 여기에 표시된 프롬프트 이외의 암호를 지정하는 것이 좋습니다.

databaseName 문자열 사용자가 연결된 데이터베이스입니다.
dbProtocol string 데이터베이스 프로토콜. Aurora MySQL의 경우 현재 이 값은 항상 MySQL입니다.
dbUserName string 클라이언트가 인증한 데이터베이스 사용자.
endTime

(버전 1.2 데이터베이스 활동 레코드만 해당)

string

SQL 문의 실행이 끝난 시간입니다. 협정 세계시(UTC) 형식으로 표시됩니다.

SQL 문의 실행 시간을 계산하려면 endTime - startTime을 사용합니다. startTime 필드도 참조하십시오.

errorMessage

(버전 1.1 데이터베이스 작업 레코드만 해당)

string

오류가 발생할 경우 DB 서버에서 생성된 오류 메시지로 이 필드가 채워집니다. 오류가 발생하지 않은 정상적인 문의 경우 errorMessage 값은 null입니다.

오류는 클라이언트에 표시되는 심각도 수준 ERROR 이상의 MySQL 오류 로그 이벤트를 생성하는 모든 활동으로 정의됩니다. 자세한 내용은 MySQL 참조 설명서The Error Log를 참조하세요. 예를 들어, 구문 오류 및 쿼리 취소는 오류 메시지를 생성합니다.

백그라운드 체크포인터 프로세스 오류와 같은 내부 MySQL 서버 오류는 오류 메시지를 생성하지 않습니다. 그러나 이러한 이벤트에 대한 레코드는 로그 심각도 수준의 설정에 관계없이 생성됩니다. 이로 인해 공격자가 로깅을 해제하여 탐지를 방지할 수 없습니다.

exitCode 필드도 참조하십시오.

exitCode int 세션 종료 레코드에 사용되는 값. 정리 종료의 경우, 여기에 종료 코드가 포함됩니다. 일부 결함 시나리오에서는 종료 코드를 항상 얻을 수 있는 것은 아닙니다. 이 경우 이 값은 0이거나 비어 있을 수 있습니다.
logTime string 감사 코드 경로에 기록된 타임스탬프. 협정 세계시(UTC) 형식으로 표시됩니다. 문 기간을 계산하는 가장 정확한 방법은 startTimeendTime 필드를 참조하십시오.
netProtocol string 네트워크 통신 프로토콜. Aurora MySQL의 경우 현재 이 값은 항상 TCP입니다.
objectName string SQL 문이 하나에서 작동하는 경우 데이터베이스 객체의 이름. 이 필드는 SQL 문이 데이터베이스 객체에서 작동하는 경우에만 사용됩니다. SQL 문이 객체에서 작동하지 않는 경우, 이 값은 비어 있습니다. 객체의 정규화된 이름을 구성하려면 databaseNameobjectName을 결합합니다. 쿼리에 여러 객체가 포함된 경우 이 필드는 쉼표로 구분된 이름 목록이 될 수 있습니다.
objectType string

테이블, 인덱스 등의 데이터베이스 객체 유형입니다. 이 필드는 SQL 문이 데이터베이스 객체에서 작동하는 경우에만 사용됩니다. SQL 문이 객체에서 작동하지 않는 경우, 이 값은 null입니다.

Aurora MySQL에 유효한 값은 다음과 같습니다.

  • INDEX

  • TABLE

  • UNKNOWN

paramList string 이 필드는 Aurora MySQL에 사용되지 않으며 항상 null입니다.
pid int 클라이언트 연결 서비스를 위해 할당된 백엔드 프로세스의 프로세스 ID입니다. 데이터베이스 서버가 다시 시작되면 pid가 변경되고 statementId 필드의 카운터가 다시 시작됩니다.
remoteHost string SQL 문을 실행한 클라이언트의 IP 주소 또는 호스트 이름입니다. Aurora MySQL의 경우 데이터베이스의 skip_name_resolve 파라미터 설정에 따라 사용되는 값이 달라집니다. localhost 값은 rdsadmin 특수 사용자의 활동을 나타냅니다.
remotePort string 클라이언트 포트 번호.
rowCount int SQL 문에 의해 영향을 받거나 검색된 테이블 행 수입니다. 이 필드는 데이터 조작 언어(DML) 문인 SQL 문에만 사용됩니다. SQL 문이 DML 문이 아닌 경우 이 값은 null입니다.
serverHost string 데이터베이스 서버 인스턴스 식별자입니다. 이 값은 Aurora MySQL과 Aurora PostgreSQL SQL에서 다르게 표현됩니다. Aurora PostgreSQL은 식별자 대신 IP 주소를 사용합니다.
serverType string 데이터베이스 서버 유형(예: MySQL).
serverVersion string 데이터베이스 서버 버전. Aurora MySQL의 경우 현재 이 값은 항상 MySQL 5.7.12입니다.
serviceName string 서비스의 이름입니다. Aurora MySQL의 경우 현재 이 값은 항상 Amazon Aurora MySQL입니다.
sessionId int 의사 고유 세션 식별자.
startTime

(버전 1.1 데이터베이스 작업 레코드만 해당)

string

SQL 문의 실행이 시작된 시간입니다. 협정 세계시(UTC) 형식으로 표시됩니다.

SQL 문의 실행 시간을 계산하려면 endTime - startTime을 사용합니다. endTime 필드도 참조하십시오.

statementId int 클라이언트의 SQL 문에 대한 식별자입니다. 카운터는 클라이언트가 입력한 각 SQL 문 단위로 증가합니다. DB 인스턴스를 다시 시작하면 카운터가 재설정됩니다.
substatementId int SQL 하위 문에 대한 식별자입니다. 이 값은 MAIN 클래스가 있는 이벤트의 경우 1이고, AUX 클래스가 있는 이벤트의 경우 2입니다. statementId 필드를 사용하여 동일한 문에 의해 생성된 모든 이벤트를 식별합니다.
transactionId

(버전 1.2 데이터베이스 활동 레코드만 해당)

int 트랜잭션의 식별자입니다.
type string 이벤트 유형입니다. 유효한 값은 record 또는 heartbeat입니다.

AWS SDK를 사용하여 데이터베이스 활동 스트림 처리

AWS SDK를 사용하여 프로그래밍 방식으로 활동 스트림을 처리할 수 ​​있습니다. 다음은 제대로 작동하는 Java 및 Python 예시로, Kinesis 데이터 스트림을 처리하는 방법을 보여줍니다.

Java
import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.security.NoSuchAlgorithmException; import java.security.NoSuchProviderException; import java.security.Security; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.zip.GZIPInputStream; import javax.crypto.Cipher; import javax.crypto.NoSuchPaddingException; import javax.crypto.spec.SecretKeySpec; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.encryptionsdk.AwsCrypto; import com.amazonaws.encryptionsdk.CryptoInputStream; import com.amazonaws.encryptionsdk.jce.JceMasterKey; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kms.AWSKMS; import com.amazonaws.services.kms.AWSKMSClientBuilder; import com.amazonaws.services.kms.model.DecryptRequest; import com.amazonaws.services.kms.model.DecryptResult; import com.amazonaws.util.Base64; import com.amazonaws.util.IOUtils; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; import org.bouncycastle.jce.provider.BouncyCastleProvider; public class DemoConsumer { private static final String STREAM_NAME = "aws-rds-das-[cluster-external-resource-id]"; private static final String APPLICATION_NAME = "AnyApplication"; //unique application name for dynamo table generation that holds kinesis shard tracking private static final String AWS_ACCESS_KEY = "[AWS_ACCESS_KEY_TO_ACCESS_KINESIS]"; private static final String AWS_SECRET_KEY = "[AWS_SECRET_KEY_TO_ACCESS_KINESIS]"; private static final String DBC_RESOURCE_ID = "[cluster-external-resource-id]"; private static final String REGION_NAME = "[region-name]"; //us-east-1, us-east-2... private static final BasicAWSCredentials CREDENTIALS = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY); private static final AWSStaticCredentialsProvider CREDENTIALS_PROVIDER = new AWSStaticCredentialsProvider(CREDENTIALS); private static final AwsCrypto CRYPTO = new AwsCrypto(); private static final AWSKMS KMS = AWSKMSClientBuilder.standard() .withRegion(REGION_NAME) .withCredentials(CREDENTIALS_PROVIDER).build(); class Activity { String type; String version; String databaseActivityEvents; String key; } class ActivityEvent { @SerializedName("class") String _class; String clientApplication; String command; String commandText; String databaseName; String dbProtocol; String dbUserName; String endTime; String errorMessage; String exitCode; String logTime; String netProtocol; String objectName; String objectType; List<String> paramList; String pid; String remoteHost; String remotePort; String rowCount; String serverHost; String serverType; String serverVersion; String serviceName; String sessionId; String startTime; String statementId; String substatementId; String transactionId; String type; } class ActivityRecords { String type; String clusterId; String instanceId; List<ActivityEvent> databaseActivityEventList; } static class RecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new RecordProcessor(); } } static class RecordProcessor implements IRecordProcessor { private static final long BACKOFF_TIME_IN_MILLIS = 3000L; private static final int PROCESSING_RETRIES_MAX = 10; private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L; private static final Gson GSON = new GsonBuilder().serializeNulls().create(); private static final Cipher CIPHER; static { Security.insertProviderAt(new BouncyCastleProvider(), 1); try { CIPHER = Cipher.getInstance("AES/GCM/NoPadding", "BC"); } catch (NoSuchAlgorithmException | NoSuchPaddingException | NoSuchProviderException e) { throw new ExceptionInInitializerError(e); } } private long nextCheckpointTimeInMillis; @Override public void initialize(String shardId) { } @Override public void processRecords(final List<Record> records, final IRecordProcessorCheckpointer checkpointer) { for (final Record record : records) { processSingleBlob(record.getData()); } if (System.currentTimeMillis() > nextCheckpointTimeInMillis) { checkpoint(checkpointer); nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS; } } @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } } private void processSingleBlob(final ByteBuffer bytes) { try { // JSON $Activity final Activity activity = GSON.fromJson(new String(bytes.array(), StandardCharsets.UTF_8), Activity.class); // Base64.Decode final byte[] decoded = Base64.decode(activity.databaseActivityEvents); final byte[] decodedDataKey = Base64.decode(activity.key); Map<String, String> context = new HashMap<>(); context.put("aws:rds:dbc-id", DBC_RESOURCE_ID); // Decrypt final DecryptRequest decryptRequest = new DecryptRequest() .withCiphertextBlob(ByteBuffer.wrap(decodedDataKey)).withEncryptionContext(context); final DecryptResult decryptResult = KMS.decrypt(decryptRequest); final byte[] decrypted = decrypt(decoded, getByteArray(decryptResult.getPlaintext())); // GZip Decompress final byte[] decompressed = decompress(decrypted); // JSON $ActivityRecords final ActivityRecords activityRecords = GSON.fromJson(new String(decompressed, StandardCharsets.UTF_8), ActivityRecords.class); // Iterate throught $ActivityEvents for (final ActivityEvent event : activityRecords.databaseActivityEventList) { System.out.println(GSON.toJson(event)); } } catch (Exception e) { // Handle error. e.printStackTrace(); } } private static byte[] decompress(final byte[] src) throws IOException { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src); GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream); return IOUtils.toByteArray(gzipInputStream); } private void checkpoint(IRecordProcessorCheckpointer checkpointer) { for (int i = 0; i < PROCESSING_RETRIES_MAX; i++) { try { checkpointer.checkpoint(); break; } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). System.out.println("Caught shutdown exception, skipping checkpoint." + se); break; } catch (ThrottlingException e) { // Backoff and re-attempt checkpoint upon transient failures if (i >= (PROCESSING_RETRIES_MAX - 1)) { System.out.println("Checkpoint failed after " + (i + 1) + "attempts." + e); break; } else { System.out.println("Transient issue when checkpointing - attempt " + (i + 1) + " of " + PROCESSING_RETRIES_MAX + e); } } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). System.out.println("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library." + e); break; } try { Thread.sleep(BACKOFF_TIME_IN_MILLIS); } catch (InterruptedException e) { System.out.println("Interrupted sleep" + e); } } } } private static byte[] decrypt(final byte[] decoded, final byte[] decodedDataKey) throws IOException { // Create a JCE master key provider using the random key and an AES-GCM encryption algorithm final JceMasterKey masterKey = JceMasterKey.getInstance(new SecretKeySpec(decodedDataKey, "AES"), "BC", "DataKey", "AES/GCM/NoPadding"); try (final CryptoInputStream<JceMasterKey> decryptingStream = CRYPTO.createDecryptingStream(masterKey, new ByteArrayInputStream(decoded)); final ByteArrayOutputStream out = new ByteArrayOutputStream()) { IOUtils.copy(decryptingStream, out); return out.toByteArray(); } } public static void main(String[] args) throws Exception { final String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); final KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(APPLICATION_NAME, STREAM_NAME, CREDENTIALS_PROVIDER, workerId); kinesisClientLibConfiguration.withInitialPositionInStream(InitialPositionInStream.LATEST); kinesisClientLibConfiguration.withRegionName(REGION_NAME); final Worker worker = new Builder() .recordProcessorFactory(new RecordProcessorFactory()) .config(kinesisClientLibConfiguration) .build(); System.out.printf("Running %s to process stream %s as worker %s...\n", APPLICATION_NAME, STREAM_NAME, workerId); try { worker.run(); } catch (Throwable t) { System.err.println("Caught throwable while processing data."); t.printStackTrace(); System.exit(1); } System.exit(0); } private static byte[] getByteArray(final ByteBuffer b) { byte[] byteArray = new byte[b.remaining()]; b.get(byteArray); return byteArray; } }
Python
import base64 import json import zlib import aws_encryption_sdk from aws_encryption_sdk import CommitmentPolicy from aws_encryption_sdk.internal.crypto import WrappingKey from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType import boto3 REGION_NAME = '<region>' # us-east-1 RESOURCE_ID = '<external-resource-id>' # cluster-ABCD123456 STREAM_NAME = 'aws-rds-das-' + RESOURCE_ID # aws-rds-das-cluster-ABCD123456 enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT) class MyRawMasterKeyProvider(RawMasterKeyProvider): provider_id = "BC" def __new__(cls, *args, **kwargs): obj = super(RawMasterKeyProvider, cls).__new__(cls) return obj def __init__(self, plain_key): RawMasterKeyProvider.__init__(self) self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING, wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC) def _get_raw_key(self, key_id): return self.wrapping_key def decrypt_payload(payload, data_key): my_key_provider = MyRawMasterKeyProvider(data_key) my_key_provider.add_master_key("DataKey") decrypted_plaintext, header = enc_client.decrypt( source=payload, materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider)) return decrypted_plaintext def decrypt_decompress(payload, key): decrypted = decrypt_payload(payload, key) return zlib.decompress(decrypted, zlib.MAX_WBITS + 16) def main(): session = boto3.session.Session() kms = session.client('kms', region_name=REGION_NAME) kinesis = session.client('kinesis', region_name=REGION_NAME) response = kinesis.describe_stream(StreamName=STREAM_NAME) shard_iters = [] for shard in response['StreamDescription']['Shards']: shard_iter_response = kinesis.get_shard_iterator(StreamName=STREAM_NAME, ShardId=shard['ShardId'], ShardIteratorType='LATEST') shard_iters.append(shard_iter_response['ShardIterator']) while len(shard_iters) > 0: next_shard_iters = [] for shard_iter in shard_iters: response = kinesis.get_records(ShardIterator=shard_iter, Limit=10000) for record in response['Records']: record_data = record['Data'] record_data = json.loads(record_data) payload_decoded = base64.b64decode(record_data['databaseActivityEvents']) data_key_decoded = base64.b64decode(record_data['key']) data_key_decrypt_result = kms.decrypt(CiphertextBlob=data_key_decoded, EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID}) print (decrypt_decompress(payload_decoded, data_key_decrypt_result['Plaintext'])) if 'NextShardIterator' in response: next_shard_iters.append(response['NextShardIterator']) shard_iters = next_shard_iters if __name__ == '__main__': main()