監控資料庫活動串流 - Amazon Aurora

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

監控資料庫活動串流

資料庫活動串流會監控和報告活動。活動資料串流會收集並傳送至 Amazon Kinesis。從 Kinesis 中,您可以監視活動串流,或者其他服務和應用程式可以使用活動串流以供進一步分析。您可以使用 AWS CLI 命令describe-db-clusters或 RDS API DescribeDBClusters作業來尋找基礎 Kinesis 串流名稱。

Aurora 會為您管理 Kinesis 串流,如下所示:

  • Aurora 會自動建立保留時間為 24 小時的 Kinesis 串流。

  • 如有必要,Aurora 可擴展 Kinesis 串流。

  • 如果您停止資料庫活動串流或刪除資料庫叢集,則 Aurora 會刪除 Kinesis 串流。

系統會監控以下類別的活動並將其放在活動串流稽核日誌中:

  • SQL 命令 – 所有 SQL 命令都經稽核,也是預備陳述式、內建函數和 PL/SQL 函數。對預存程序的呼叫會進行稽核。在儲存的程序或函數中發出的任何 SQL 語句也被稽核。

  • 其他資料庫資訊 – 監控的活動包含完整的 SQL 陳述式、因 DML 命令而受影響的資料列數、經存取的物件和唯一的資料庫名稱。若為 Aurora PostgreSQL,資料庫活動串流還會監控連結變數和已儲存的程序參數。

    重要

    活動資料串流稽核記錄中會顯示每個陳述式的完整 SQL 文字,包括任何敏感資料。但是,如果 Aurora 可以從內容 (例如下列 SQL 陳述式) 判斷資料庫使用者密碼,則該密碼將會被修訂。

    ALTER ROLE role-name WITH password
  • 連線資訊 – 監控的活動包含工作階段和網路資訊、伺服器程序 ID 和結束代碼。

如果活動串流在監控資料庫執行個體時失敗,系統會透過 RDS 事件來通知您。

透過 Kinesis 存取活動串流

在您啟用資料庫叢集的活動串流時,系統就會為您建立 Kinesis 串流。透過 Kinesis,您就可以即時監控資料庫活動。若要進一步分析資料庫活動,您可以將 Kinesis 串流連線至消費者應用程式。您也可以將串流連線至法規遵循管理應用程式,例如 IBM 的安全衛士或 Imperva 的 SecureSphere 資料庫稽核與保護 與保護。

您可以從 RDS 主控台或 Kinesis 主控台存取 Kinesis 串流。

使用 RDS 主控台從 Kinesis 存取活動串流
  1. 前往 https://console.aws.amazon.com/rds/,開啟 Amazon RDS 主控台。

  2. 在導覽窗格中,選擇 Databases (資料庫)。

  3. 選擇已在其上啟動活動串流的資料庫叢集

  4. 選擇 Configuration (組態)

  5. Database activity stream (資料庫活動串流) 下,選擇 Kinesis stream (Kinesis 串流) 下的連結。

  6. 在 Kinesis 主控台中,選擇 Monitoring (監控) 來開始觀察資料庫活動。

使用 Kinesis 主控台從 Kinesis 存取活動串流
  1. https://console.aws.amazon.com/kinesis 上開啟 Kinesis 主控台。

  2. 從 Kinesis 串流清單中選擇活動串流。

    活動串流的名稱包含字首 aws-rds-das-cluster-,後接資料庫叢集的資源 ID。以下是範例。

    aws-rds-das-cluster-NHVOV4PCLWHGF52NP

    若要使用 Amazon RDS 主控台來尋找資料庫叢集的資源 ID,請從資料庫清單中選擇您的資料庫叢集,然後選擇Configuration (組態) 索引標籤。

    若 AWS CLI 要使用尋找活動串流的完整 Kinesis 串流名稱,請使用 describe-db-clustersCLI 要求並記下回應ActivityStreamKinesisStreamName中的值。

  3. 選擇 Monitoring (監控) 來開始觀察資料庫活動。

如需使用 Amazon Kinesis 的詳細資訊,請參閱什麼是 Amazon Kinesis Data Streams?

稽核日誌內容和範例

受監控的事件會以 JSON 字串的形式在資料庫活動串流中顯示。此結構包含 JSON 物件,內含的 DatabaseActivityMonitoringRecord 會依序包含活動事件的 databaseActivityEventList 陣列。

活動串流的稽核記錄範例

以下是活動事件記錄的範例解密 JSON 稽核日誌。

範例 an Aurora PostgreSQL CONNECT SQL 陳述式 的活動事件記錄

以下活動事件記錄顯示 psql 用戶端 (clientApplication) 使用 CONNECT SQL 陳述式 (command) 登入。

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-10-30 00:39:49.940668+00", "logTime": "2019-10-30 00:39:49.990579+00", "statementId": 1, "substatementId": 1, "objectType": null, "command": "CONNECT", "objectName": null, "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "49804", "sessionId": "5ce5f7f0.474b", "rowCount": null, "commandText": null, "paramList": [], "pid": 18251, "clientApplication": "psql", "exitCode": null, "class": "MISC", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }
範例 Aurora MySQL CONNECT SQL 陳述式的活動事件記錄

以下是 mysql 用戶端 (clientApplication) 使用 CONNECT SQL 陳述式 (command) 登入的活動事件記錄。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:13.267214+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"rdsadmin", "databaseName":"", "remoteHost":"localhost", "remotePort":"11053", "command":"CONNECT", "commandText":"", "paramList":null, "objectType":"TABLE", "objectName":"", "statementId":0, "substatementId":1, "exitCode":"0", "sessionId":"725121", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:13.267207+00", "endTime":"2020-05-22 18:07:13.267213+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }
範例 Aurora PostgreSQL CREATE TABLE 陳述式的活動事件記錄

以下是 Aurora PostgreSQLCREATE TABLE 事件範例。

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-05-24 00:36:54.403455+00", "logTime": "2019-05-24 00:36:54.494235+00", "statementId": 2, "substatementId": 1, "objectType": null, "command": "CREATE TABLE", "objectName": null, "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "34534", "sessionId": "5ce73c6f.7e64", "rowCount": null, "commandText": "create table my_table (id serial primary key, name varchar(32));", "paramList": [], "pid": 32356, "clientApplication": "psql", "exitCode": null, "class": "DDL", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }
範例 Aurora MySQL CREATE TABLE 陳述式的活動事件記錄

以下範例顯示 Aurora MySQL 的 CREATE TABLE 陳述式。該操作會以兩個不同的事件記錄表示。一個活動具有 "class":"MAIN"。另一個活動具有 "class":"AUX"。這些訊息可能以任何順序到達。logTime 事件的 MAIN 欄位永遠早於任何對應 logTime 事件的 AUX 欄位。

下列範例會顯示 class 值為 MAIN 的事件。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:12.250221+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"QUERY", "commandText":"CREATE TABLE test1 (id INT)", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65459278, "substatementId":1, "exitCode":"0", "sessionId":"725118", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:12.226384+00", "endTime":"2020-05-22 18:07:12.250222+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }

下列範例會顯示 class 值為 AUX 的對應事件。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:12.247182+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"CREATE", "commandText":"test1", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65459278, "substatementId":2, "exitCode":"", "sessionId":"725118", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:12.226384+00", "endTime":"2020-05-22 18:07:12.247182+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"AUX" } ] }
範例 Aurora PostgreSQLSELECT 陳述式的活動事件記錄

下列範例顯示 的 SELECT 事件。

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-05-24 00:39:49.920564+00", "logTime": "2019-05-24 00:39:49.940668+00", "statementId": 6, "substatementId": 1, "objectType": "TABLE", "command": "SELECT", "objectName": "public.my_table", "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "34534", "sessionId": "5ce73c6f.7e64", "rowCount": 10, "commandText": "select * from my_table;", "paramList": [], "pid": 32356, "clientApplication": "psql", "exitCode": null, "class": "READ", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }
{ "type": "DatabaseActivityMonitoringRecord", "clusterId": "", "instanceId": "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q", "databaseActivityEventList": [ { "class": "TABLE", "clientApplication": "Microsoft SQL Server Management Studio - Query", "command": "SELECT", "commandText": "select * from [testDB].[dbo].[TestTable]", "databaseName": "testDB", "dbProtocol": "SQLSERVER", "dbUserName": "test", "endTime": null, "errorMessage": null, "exitCode": 1, "logTime": "2022-10-06 21:24:59.9422268+00", "netProtocol": null, "objectName": "TestTable", "objectType": "TABLE", "paramList": null, "pid": null, "remoteHost": "local machine", "remotePort": null, "rowCount": 0, "serverHost": "172.31.30.159", "serverType": "SQLSERVER", "serverVersion": "15.00.4073.23.v1.R1", "serviceName": "sqlserver-ee", "sessionId": 62, "startTime": null, "statementId": "0x03baed90412f564fad640ebe51f89b99", "substatementId": 1, "transactionId": "4532935", "type": "record", "engineNativeAuditFields": { "target_database_principal_id": 0, "target_server_principal_id": 0, "target_database_principal_name": "", "server_principal_id": 2, "user_defined_information": "", "response_rows": 0, "database_principal_name": "dbo", "target_server_principal_name": "", "schema_name": "dbo", "is_column_permission": true, "object_id": 581577110, "server_instance_name": "EC2AMAZ-NFUJJNO", "target_server_principal_sid": null, "additional_information": "", "duration_milliseconds": 0, "permission_bitmask": "0x00000000000000000000000000000001", "data_sensitivity_information": "", "session_server_principal_name": "test", "connection_id": "AD3A5084-FB83-45C1-8334-E923459A8109", "audit_schema_version": 1, "database_principal_id": 1, "server_principal_sid": "0x010500000000000515000000bdc2795e2d0717901ba6998cf4010000", "user_defined_event_id": 0, "host_name": "EC2AMAZ-NFUJJNO" } } ] }
範例 Aurora MySQL SELECT 陳述式的活動事件記錄

下列範例顯示 SELECT 事件。

下列範例會顯示 class 值為 MAIN 的事件。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:29:57.986467+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"QUERY", "commandText":"SELECT * FROM test1 WHERE id < 28", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65469218, "substatementId":1, "exitCode":"0", "sessionId":"726571", "rowCount":2, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:29:57.986364+00", "endTime":"2020-05-22 18:29:57.986467+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }

下列範例會顯示 class 值為 AUX 的對應事件。

{ "type":"DatabaseActivityMonitoringRecord", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:29:57.986399+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"READ", "commandText":"test1", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65469218, "substatementId":2, "exitCode":"", "sessionId":"726571", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:29:57.986364+00", "endTime":"2020-05-22 18:29:57.986399+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"AUX" } ] }

DatabaseActivityMonitoringRecords物件

資料庫活動事件記錄位於 JSON 物件中,其中包含下列資訊。

JSON 欄位 資料類型 描述

type

string

JSON 記錄類型。值為 DatabaseActivityMonitoringRecords

version string 資料庫活動監控記錄的版本。

產生的資料庫活動記錄版本取決於資料庫叢集的引擎版本:

  • 針對執行引擎 10.10 版及更新版本的次要版本和引擎 11.5 版及更新版本的 Aurora PostgreSQL 資料庫叢集,會產生 1.1 版資料庫活動記錄。

  • 針對執行引擎 10.7 和 11.4 版的 Aurora PostgreSQL 資料庫叢集,會產生 1.0 版資料庫活動記錄。

除非特別註明,否則下列所有欄位都在 1.0 版和 1.1 版中。

databaseActivityEvents

string

包含活動事件的 JSON 物件。

金鑰 string 您用來解密 databaseActivityEvent清單 的加密金鑰

databaseActivityEvents 對象

databaseActivityEvents JSON 物件包含以下資訊。

JSON 記錄中的最上層欄位

稽核記錄檔中的每個事件都會包裝在 JSON 格式的記錄中。此記錄包含下列欄位。

type

此欄位永遠具有值 DatabaseActivityMonitoringRecords

version

此欄位代表資料庫活動串流資料通訊協定或合約的版本。其會定義哪些欄位可用。

1.0 版代表 Aurora PostgreSQL 10.7 和 11.4 版的原始資料活動串流支援。1.1 版代表 Aurora PostgreSQL 10.10 及更高版本和 Aurora PostgreSQL 11.5 及更高版本的資料活動串流支援。1.1 版包含其他欄位 errorMessagestartTime。1.2 版代表 Aurora MySQL 2.08 及更高版本的資料活動串流支援。1.2 版包含其他欄位 endTimetransactionId

databaseActivityEvents

代表一或多個活動事件的加密字串。它被表示為一個 base64 位元組陣列。在您解密字串時,結果會是 JSON 格式的記錄,其中包含欄位,如本節範例所示。

金鑰

用來加密 databaseActivityEvents 字串的加密資料金鑰。這與 AWS KMS key 您啟動資料庫活動串流時所提供的相同。

下列範例顯示此記錄的格式。

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents":"encrypted audit records", "key":"encrypted key" }

採取下列步驟來解密 databaseActivityEvents 欄位的內容:

  1. 使用您在啟動資料庫活動串流時提供的 KMS 金鑰,解密 key JSON 欄位中的值。這麼做會以純文字傳回資料加密金鑰。

  2. Base64 解碼 databaseActivityEvents JSON 欄位中的值,以取得稽核承載的二進位格式的加密文字。

  3. 使用您在第一個步驟中解碼的資料加密金鑰來解密二進位密文。

  4. 解壓縮已解密的承載。

    • 加密的承載在 databaseActivityEvents 欄位。

    • databaseActivityEventList 欄位包含稽核記錄的陣列。陣列中的 type 欄位可以是 recordheartbeat

稽核日誌活動事件記錄是包含以下資訊的 JSON 物件。

JSON 欄位 資料類型 描述

type

string

JSON 記錄類型。值為 DatabaseActivityMonitoringRecord

clusterId string 資料庫叢集資源識別符。其對應於資料庫叢集屬性 DbClusterResourceId
instanceId string 資料庫執行個體資源識別符。它對應於資料庫執行個體屬性 DbiResourceId

databaseActivityEvent清單

string

活動稽核記錄或活動訊號訊息的陣列。

databaseActivityEvent列出數組

稽核日誌承載是加密的 databaseActivityEventList JSON 陣列。以下資料表列出稽核記錄中已解密 DatabaseActivityEventList 陣列中,每個活動事件的欄位 (按英文字母順序列出)。這些欄位會根據您使用 Aurora PostgreSQL 或 Aurora MySQL 而有所不同。請參閱適用於資料庫引擎的表格。

重要

事件結構可能會改變。Aurora 可能會在未來將新的欄位新增至活動事件。在剖析 JSON 資料的應用程式中,請確定程式碼可以忽略,或針對未知欄位名稱採取適當的動作。

databaseActivityEventAurora 的列表字段
欄位 資料類型 描述
class string

活動事件的類別。Aurora PostgreSQL 的有效值如下:

  • ALL

  • CONNECT – 連線或中斷連線的事件。

  • DDL – DDL 陳述式,不包含在 ROLE 類別的陳述式清單中。

  • FUNCTION– 函數呼叫或 DO 區塊。

  • MISCDISCARDFETCHCHECKPOINTVACUUM 之類的其他命令。

  • NONE

  • READ – 來源為關聯或查詢時 SELECTCOPY 陳述式。

  • ROLE – 與包含 GRANTREVOKECREATE/ALTER/DROP ROLE 之角色和權限相關的陳述式。

  • WRITE – 目的地為關聯時,INSERTUPDATEDELETETRUNCATECOPY 陳述式。

clientApplication string 用戶端報告用來連接的應用程式。用戶端不需提供此資訊,因此此值可以是 Null。
command string SQL 命令名稱,其中不含任何命令詳細資訊。
commandText string

使用者傳遞的實際 SQL 陳述式。針對 Aurora PostgreSQL,該值與原始 SQL 陳述式相同。此欄位可用於連接或中斷連接記錄以外的所有記錄類型,在前述兩種例外類型中值為 Null。

重要

活動資料串流稽核記錄中會顯示每個陳述式的完整 SQL 文字,包括任何敏感資料。但是,如果 Aurora 可以從內容 (例如下列 SQL 陳述式) 判斷資料庫使用者密碼,則會編寫密文。

ALTER ROLE role-name WITH password
databaseName string 使用者連接的資料庫。
dbProtocol string 資料庫通訊協定,例如 Postgres 3.0
dbUserName string 用戶端驗證所用的資料庫使用者。
errorMessage

(僅限 1.1 版資料庫活動記錄)

string

如果有任何錯誤,該欄位會填入由資料庫伺服器產生的錯誤訊息。對於未導致錯誤的正常陳述式,此 errorMessage 值為 null。

錯誤是定義為任何會產生用戶端可見 PostgreSQL 錯誤日誌事件的活動,其嚴重性層級為 ERROR 或更高。如需詳細資訊,請參閱 PostgreSQL 訊息嚴重性等級。例如,語法錯誤和查詢取消會產生錯誤訊息。

內部 PostgreSQL 伺服器錯誤,例如背景檢查點指標處理程序錯誤,並不會產生錯誤訊息。不過,無論日誌嚴重性層級的設定為何,仍會發出這類事件的記錄。這可以防止攻擊者關閉記錄日誌以嘗試避免偵測。

另請參閱 exitCode 欄位。

exitCode int 工作階段結束記錄所用的值。清除結束時,此會包含結束代碼。在某些失敗狀況下可能無法隨時取得結束代碼。範例為 PostgreSQL 執行 exit() 或運算子執行 kill -9 之類的命令。

如果發生任何錯誤,此 exitCode 欄位會顯示 SQL 錯誤代碼 SQLSTATE,如 PostgreSQL 錯誤代碼中所列。

另請參閱 errorMessage 欄位。

logTime string 在稽核程式碼路徑中記錄的時間戳記。這代表 SQL 陳述式執行結束時間。另請參閱 startTime 欄位。
netProtocol string 網路通訊協定。
objectName string SQL 陳述式在其中操作的資料庫物件名稱。僅在 SQL 陳述式在資料庫物件上操作時才會使用此欄位。如果 SQL 陳述式沒有在物件上操作,則此值為 Null。
objectType string 資料表、索引、檢視等之類的資料庫物件類型。僅在 SQL 陳述式在資料庫物件上操作時才會使用此欄位。如果 SQL 陳述式沒有在物件上操作,則此值為 Null。有效值包括以下項目:
  • COMPOSITE TYPE

  • FOREIGN TABLE

  • FUNCTION

  • INDEX

  • MATERIALIZED VIEW

  • SEQUENCE

  • TABLE

  • TOAST TABLE

  • VIEW

  • UNKNOWN

paramList string 傳遞至 SQL 陳述式的參數陣列 (以逗號分隔)。如果 SQL 陳述式沒有任何參數,此值會是空的陣列。
pid int 後端程序的程序 ID,此程序的配置是用來提供用戶端連線。
remoteHost string 用戶端 IP 地址或主機名稱。針對 Aurora PostgreSQL,使用哪一個取決於資料庫的 log_hostname 參數設定。
remotePort string 用戶端連接埠號碼。
rowCount int SQL 陳述式傳回的資料列數目。例如,如果一個 SELECT 陳述式傳回 10 個資料列,則 rowCount 為 10。對於 INSERT 或 UPDATE 陳述式,rowCount 為 0。
serverHost string 資料庫伺服器主機 IP 地址。
serverType string 資料庫伺服器類型,例如 PostgreSQL
serverVersion string 資料庫伺服器版本,例如 Aurora PostgreSQL 的 2.3.1
serviceName string 服務名稱,例如 Amazon Aurora PostgreSQL-Compatible edition
sessionId int 虛擬唯一的工作階段識別符。
sessionId int 虛擬唯一的工作階段識別符。
startTime

(僅限 1.1 版資料庫活動記錄)

string

SQL 陳述式開始執行的時間。

若要計算 SQL 陳述式大約的執行時間,請使用 logTime - startTime。另請參閱 logTime 欄位。

statementId int 用戶端 SQL 陳述式的識別符。記數器是使用工作階段層級,且會隨著用戶端輸入的每個 SQL 陳述式遞增。
substatementId int SQL 子陳述式的識別符。此值會計算由 statementId 欄位識別的每個 SQL 陳述式包含的子陳述式。
type string 事件類型。有效值為 recordheartbeat
databaseActivityEventAurora MySQL 的列表字段
欄位 資料類型 描述
class string

活動事件的類別。

Aurora MySQL 的有效值如下:

  • MAIN – 代表 SQL 陳述式的主要事件。

  • AUX – 包含其他詳細資訊的補充事件。例如,重新命名物件的陳述式可能具有反映新名稱的類別 AUX 的事件。

    若要尋找 MAINAUX 相同陳述式相對應的事件,請檢查 pid 欄位和 statementId 欄位值相同的不同事件。

clientApplication string 用戶端報告用來連接的應用程式。用戶端不需提供此資訊,因此此值可以是 Null。
command string

SQL 陳述式的一般類別。此欄位的值取決於 class 的值。

classMAIN 時的值包括以下內容:

  • CONNECT – 用戶端工作階段已連線時。

  • QUERY – SQL 陳述式。伴隨著一個或多個 class 的值為 AUX 的事件。

  • DISCONNECT – 用戶端工作階段中斷連線時。

  • FAILED_CONNECT – 當用戶端嘗試連線但無法連線時。

  • CHANGEUSER – 屬於 MySQL 網路通訊協定的一部分狀態變更,而不是來自您發出的陳述式。

classAUX 時的值包括以下內容:

  • READ – 來源為關聯或查詢時 SELECTCOPY 陳述式。

  • WRITE – 目的地為關聯時,INSERTUPDATEDELETETRUNCATECOPY 陳述式。

  • DROP – 刪除物件。

  • CREATE – 建立物件。

  • RENAME – 重新命名物件。

  • ALTER – 檢視物件的屬性。

commandText string

對於 class 值為 MAIN 的事件,此欄位代表使用者傳入的實際 SQL 陳述式。此欄位可用於連接或中斷連接記錄以外的所有記錄類型,在前述兩種例外類型中值為 Null。

對於 class 值為 AUX 的事件,此欄位包含有關事件涉及之物件的補充資訊。

針對 Aurora MySQL,引號之類的字元前面加上反斜線,代表逸出字元。

重要

稽核記錄中會顯示每個陳述式的完整 SQL 文字,包括任何敏感資料。但是,如果 Aurora 可以從內容 (例如下列 SQL 陳述式) 判斷資料庫使用者密碼,則會編寫密文。

mysql> SET PASSWORD = 'my-password';
注意

指定此處所顯示提示以外的密碼,作為安全最佳實務。

databaseName string 使用者連接的資料庫。
dbProtocol string 資料庫通訊協定。目前,Aurora MySQL 的這個值永遠是 MySQL
dbUserName string 用戶端驗證所用的資料庫使用者。
endTime

(僅限 1.2 版資料庫活動記錄)

string

SQL 陳述式結束執行的時間。以國際標準時間 (UTC) 格式表示。

若要計算 SQL 陳述式的執行時間,請使用 endTime - startTime。另請參閱 startTime 欄位。

errorMessage

(僅限 1.1 版資料庫活動記錄)

string

如果有任何錯誤,該欄位會填入由資料庫伺服器產生的錯誤訊息。對於未導致錯誤的正常陳述式,此 errorMessage 值為 null。

錯誤是定義為任何會產生用戶端可見 MySQL 錯誤日誌事件的活動,其嚴重性層級為 ERROR 或更高。如需更多資訊,請參閱 MySQL 參考手冊中的錯誤記錄。例如,語法錯誤和查詢取消會產生錯誤訊息。

內部 MySQL 伺服器錯誤,例如背景檢查點指標處理程序錯誤,並不會產生錯誤訊息。不過,無論日誌嚴重性層級的設定為何,仍會發出這類事件的記錄。這可以防止攻擊者關閉記錄日誌以嘗試避免偵測。

另請參閱 exitCode 欄位。

exitCode int 工作階段結束記錄所用的值。清除結束時,此會包含結束代碼。在某些失敗狀況下可能無法隨時取得結束代碼。在這種情況下,此值可能是零,也可能是空白。
logTime string 在稽核程式碼路徑中記錄的時間戳記。以國際標準時間 (UTC) 格式表示。如需計算陳述式持續時間的最準確方式,請參閱 startTimeendTime 欄位。
netProtocol string 網路通訊協定。目前,Aurora MySQL 的這個值永遠是 TCP
objectName string SQL 陳述式在其中操作的資料庫物件名稱。僅在 SQL 陳述式在資料庫物件上操作時才會使用此欄位。如果 SQL 陳述式沒有在物件上操作,則此值為空白。若要建構物件的完整名稱,請結合 databaseNameobjectName。如果查詢涉及多個物件,則此欄位可以是以逗號分隔的名稱清單。
objectType string

資料表、索引、檢視等之類的資料庫物件類型。僅在 SQL 陳述式在資料庫物件上操作時才會使用此欄位。如果 SQL 陳述式沒有在物件上操作,則此值為 Null。

Aurora MySQL 的有效值包括以下項目:

  • INDEX

  • TABLE

  • UNKNOWN

paramList string 此欄位不會用於 Aurora MySQL 且一律為空。
pid int 後端程序的程序 ID,此程序的配置是用來提供用戶端連線。重新啟動資料庫伺服器時,pid 變更和 statementId 欄位的計數器會重新啟動。
remoteHost string 發出 SQL 陳述式之用戶端的 IP 地址或主機名稱。針對 Aurora MySQL,使用哪一個取決於資料庫的 skip_name_resolve 參數設定。localhost 的值表示來自 rdsadmin 特殊使用者的活動。
remotePort string 用戶端連接埠號碼。
rowCount int SQL 陳述式影響或擷取的資料列數。僅會對資料處理語言 (DML) 陳述式的 SQL 陳述式使用此欄位。如果 SQL 陳述式不是 DML 陳述式,則此值為 Null。
serverHost string 資料庫伺服器執行個體識別符。此值在 Aurora MySQL 的表示方式與 Aurora PostgreSQL 不同。Aurora PostgreSQL 會使用 IP 地址,而非識別符。
serverType string 資料庫伺服器類型,例如 MySQL
serverVersion string 資料庫伺服器版本。目前,Aurora MySQL 的這個值永遠是 MySQL 5.7.12
serviceName string 服務的名稱。目前,Aurora MySQL 的這個值永遠是 Amazon Aurora MySQL
sessionId int 虛擬唯一的工作階段識別符。
startTime

(僅限 1.1 版資料庫活動記錄)

string

SQL 陳述式開始執行的時間。以國際標準時間 (UTC) 格式表示。

若要計算 SQL 陳述式的執行時間,請使用 endTime - startTime。另請參閱 endTime 欄位。

statementId int 用戶端 SQL 陳述式的識別符。計數器會隨著用戶端輸入的每個 SQL 陳述式而增加。當資料庫執行個體重新啟動時,計數器會重設。
substatementId int SQL 子陳述式的識別符。對於具有類別 MAIN 的事件此值是 1,對於具有類別 AUX 的事件則為 2。使用此 statementId 欄位可識別由相同陳述式產生的所有事件。
transactionId

(僅限 1.2 版資料庫活動記錄)

int 交易的識別符。
type string 事件類型。有效值為 recordheartbeat

使用 AWS SDK 處理資料庫活動串流

您可以使用 AWS SDK 以程式設計方式處理活動串流。以下是功能完整的 Java 和 Python 範例,其示範您可以如何處理 Kinesis 資料串流。

Java
import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.security.NoSuchAlgorithmException; import java.security.NoSuchProviderException; import java.security.Security; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.zip.GZIPInputStream; import javax.crypto.Cipher; import javax.crypto.NoSuchPaddingException; import javax.crypto.spec.SecretKeySpec; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.encryptionsdk.AwsCrypto; import com.amazonaws.encryptionsdk.CryptoInputStream; import com.amazonaws.encryptionsdk.jce.JceMasterKey; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kms.AWSKMS; import com.amazonaws.services.kms.AWSKMSClientBuilder; import com.amazonaws.services.kms.model.DecryptRequest; import com.amazonaws.services.kms.model.DecryptResult; import com.amazonaws.util.Base64; import com.amazonaws.util.IOUtils; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; import org.bouncycastle.jce.provider.BouncyCastleProvider; public class DemoConsumer { private static final String STREAM_NAME = "aws-rds-das-[cluster-external-resource-id]"; private static final String APPLICATION_NAME = "AnyApplication"; //unique application name for dynamo table generation that holds kinesis shard tracking private static final String AWS_ACCESS_KEY = "[AWS_ACCESS_KEY_TO_ACCESS_KINESIS]"; private static final String AWS_SECRET_KEY = "[AWS_SECRET_KEY_TO_ACCESS_KINESIS]"; private static final String DBC_RESOURCE_ID = "[cluster-external-resource-id]"; private static final String REGION_NAME = "[region-name]"; //us-east-1, us-east-2... private static final BasicAWSCredentials CREDENTIALS = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY); private static final AWSStaticCredentialsProvider CREDENTIALS_PROVIDER = new AWSStaticCredentialsProvider(CREDENTIALS); private static final AwsCrypto CRYPTO = new AwsCrypto(); private static final AWSKMS KMS = AWSKMSClientBuilder.standard() .withRegion(REGION_NAME) .withCredentials(CREDENTIALS_PROVIDER).build(); class Activity { String type; String version; String databaseActivityEvents; String key; } class ActivityEvent { @SerializedName("class") String _class; String clientApplication; String command; String commandText; String databaseName; String dbProtocol; String dbUserName; String endTime; String errorMessage; String exitCode; String logTime; String netProtocol; String objectName; String objectType; List<String> paramList; String pid; String remoteHost; String remotePort; String rowCount; String serverHost; String serverType; String serverVersion; String serviceName; String sessionId; String startTime; String statementId; String substatementId; String transactionId; String type; } class ActivityRecords { String type; String clusterId; String instanceId; List<ActivityEvent> databaseActivityEventList; } static class RecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new RecordProcessor(); } } static class RecordProcessor implements IRecordProcessor { private static final long BACKOFF_TIME_IN_MILLIS = 3000L; private static final int PROCESSING_RETRIES_MAX = 10; private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L; private static final Gson GSON = new GsonBuilder().serializeNulls().create(); private static final Cipher CIPHER; static { Security.insertProviderAt(new BouncyCastleProvider(), 1); try { CIPHER = Cipher.getInstance("AES/GCM/NoPadding", "BC"); } catch (NoSuchAlgorithmException | NoSuchPaddingException | NoSuchProviderException e) { throw new ExceptionInInitializerError(e); } } private long nextCheckpointTimeInMillis; @Override public void initialize(String shardId) { } @Override public void processRecords(final List<Record> records, final IRecordProcessorCheckpointer checkpointer) { for (final Record record : records) { processSingleBlob(record.getData()); } if (System.currentTimeMillis() > nextCheckpointTimeInMillis) { checkpoint(checkpointer); nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS; } } @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } } private void processSingleBlob(final ByteBuffer bytes) { try { // JSON $Activity final Activity activity = GSON.fromJson(new String(bytes.array(), StandardCharsets.UTF_8), Activity.class); // Base64.Decode final byte[] decoded = Base64.decode(activity.databaseActivityEvents); final byte[] decodedDataKey = Base64.decode(activity.key); Map<String, String> context = new HashMap<>(); context.put("aws:rds:dbc-id", DBC_RESOURCE_ID); // Decrypt final DecryptRequest decryptRequest = new DecryptRequest() .withCiphertextBlob(ByteBuffer.wrap(decodedDataKey)).withEncryptionContext(context); final DecryptResult decryptResult = KMS.decrypt(decryptRequest); final byte[] decrypted = decrypt(decoded, getByteArray(decryptResult.getPlaintext())); // GZip Decompress final byte[] decompressed = decompress(decrypted); // JSON $ActivityRecords final ActivityRecords activityRecords = GSON.fromJson(new String(decompressed, StandardCharsets.UTF_8), ActivityRecords.class); // Iterate throught $ActivityEvents for (final ActivityEvent event : activityRecords.databaseActivityEventList) { System.out.println(GSON.toJson(event)); } } catch (Exception e) { // Handle error. e.printStackTrace(); } } private static byte[] decompress(final byte[] src) throws IOException { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src); GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream); return IOUtils.toByteArray(gzipInputStream); } private void checkpoint(IRecordProcessorCheckpointer checkpointer) { for (int i = 0; i < PROCESSING_RETRIES_MAX; i++) { try { checkpointer.checkpoint(); break; } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). System.out.println("Caught shutdown exception, skipping checkpoint." + se); break; } catch (ThrottlingException e) { // Backoff and re-attempt checkpoint upon transient failures if (i >= (PROCESSING_RETRIES_MAX - 1)) { System.out.println("Checkpoint failed after " + (i + 1) + "attempts." + e); break; } else { System.out.println("Transient issue when checkpointing - attempt " + (i + 1) + " of " + PROCESSING_RETRIES_MAX + e); } } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). System.out.println("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library." + e); break; } try { Thread.sleep(BACKOFF_TIME_IN_MILLIS); } catch (InterruptedException e) { System.out.println("Interrupted sleep" + e); } } } } private static byte[] decrypt(final byte[] decoded, final byte[] decodedDataKey) throws IOException { // Create a JCE master key provider using the random key and an AES-GCM encryption algorithm final JceMasterKey masterKey = JceMasterKey.getInstance(new SecretKeySpec(decodedDataKey, "AES"), "BC", "DataKey", "AES/GCM/NoPadding"); try (final CryptoInputStream<JceMasterKey> decryptingStream = CRYPTO.createDecryptingStream(masterKey, new ByteArrayInputStream(decoded)); final ByteArrayOutputStream out = new ByteArrayOutputStream()) { IOUtils.copy(decryptingStream, out); return out.toByteArray(); } } public static void main(String[] args) throws Exception { final String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); final KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(APPLICATION_NAME, STREAM_NAME, CREDENTIALS_PROVIDER, workerId); kinesisClientLibConfiguration.withInitialPositionInStream(InitialPositionInStream.LATEST); kinesisClientLibConfiguration.withRegionName(REGION_NAME); final Worker worker = new Builder() .recordProcessorFactory(new RecordProcessorFactory()) .config(kinesisClientLibConfiguration) .build(); System.out.printf("Running %s to process stream %s as worker %s...\n", APPLICATION_NAME, STREAM_NAME, workerId); try { worker.run(); } catch (Throwable t) { System.err.println("Caught throwable while processing data."); t.printStackTrace(); System.exit(1); } System.exit(0); } private static byte[] getByteArray(final ByteBuffer b) { byte[] byteArray = new byte[b.remaining()]; b.get(byteArray); return byteArray; } }
Python
import base64 import json import zlib import aws_encryption_sdk from aws_encryption_sdk import CommitmentPolicy from aws_encryption_sdk.internal.crypto import WrappingKey from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType import boto3 REGION_NAME = '<region>' # us-east-1 RESOURCE_ID = '<external-resource-id>' # cluster-ABCD123456 STREAM_NAME = 'aws-rds-das-' + RESOURCE_ID # aws-rds-das-cluster-ABCD123456 enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT) class MyRawMasterKeyProvider(RawMasterKeyProvider): provider_id = "BC" def __new__(cls, *args, **kwargs): obj = super(RawMasterKeyProvider, cls).__new__(cls) return obj def __init__(self, plain_key): RawMasterKeyProvider.__init__(self) self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING, wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC) def _get_raw_key(self, key_id): return self.wrapping_key def decrypt_payload(payload, data_key): my_key_provider = MyRawMasterKeyProvider(data_key) my_key_provider.add_master_key("DataKey") decrypted_plaintext, header = enc_client.decrypt( source=payload, materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider)) return decrypted_plaintext def decrypt_decompress(payload, key): decrypted = decrypt_payload(payload, key) return zlib.decompress(decrypted, zlib.MAX_WBITS + 16) def main(): session = boto3.session.Session() kms = session.client('kms', region_name=REGION_NAME) kinesis = session.client('kinesis', region_name=REGION_NAME) response = kinesis.describe_stream(StreamName=STREAM_NAME) shard_iters = [] for shard in response['StreamDescription']['Shards']: shard_iter_response = kinesis.get_shard_iterator(StreamName=STREAM_NAME, ShardId=shard['ShardId'], ShardIteratorType='LATEST') shard_iters.append(shard_iter_response['ShardIterator']) while len(shard_iters) > 0: next_shard_iters = [] for shard_iter in shard_iters: response = kinesis.get_records(ShardIterator=shard_iter, Limit=10000) for record in response['Records']: record_data = record['Data'] record_data = json.loads(record_data) payload_decoded = base64.b64decode(record_data['databaseActivityEvents']) data_key_decoded = base64.b64decode(record_data['key']) data_key_decrypt_result = kms.decrypt(CiphertextBlob=data_key_decoded, EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID}) print (decrypt_decompress(payload_decoded, data_key_decrypt_result['Plaintext'])) if 'NextShardIterator' in response: next_shard_iters.append(response['NextShardIterator']) shard_iters = next_shard_iters if __name__ == '__main__': main()