Überwachen von Datenbankaktivitäts-Streams - Amazon Aurora

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Überwachen von Datenbankaktivitäts-Streams

Datenbankaktivitäts-Streams überwachen und melden Aktivitäten. Der Aktivitäts-Stream wird erfasst und an Amazon Kinesis übertragen. Von Kinesis aus können Sie den Aktivitäts-Stream überwachen, oder andere Dienste und Anwendungen können den Aktivitäts-Stream zur weiteren Analyse nutzen. Sie können den zugrunde liegenden Kinesis-Stream-Namen mithilfe des - AWS CLI Befehls describe-db-clusters oder der RDS-API-DescribeDBClustersOperation finden.

Aurora verwaltet den Kinesis Stream wie folgt:

  • Aurora erzeugt den Kinesis Stream automatisch mit einem Aufbewahrungszeitraum von 24 Stunden.

  • Aurora skaliert den Kinesis-Stream bei Bedarf.

  • Wenn Sie den Datenbankaktivitäts-Stream stoppen oder den DB-Cluster löschen, löscht Aurora den Kinesis-Stream.

Die folgenden Kategorien von Aktivitäten werden überwacht und in das Prüfprotokoll des Aktivitäts-Streams aufgenommen:

  • SQL-Befehle – Alle SQL-Befehle werden geprüft, ebenso vorbereitete Anweisungen, integrierte Funktionen und Funktionen in PL/SQL. Aufrufe von gespeicherten Prozeduren werden überprüft. Alle SQL-Anweisungen, die in gespeicherten Prozeduren oder Funktionen ausgegeben werden, werden ebenfalls überprüft.

  • Sonstige Datenbankinformationen – Die überwachte Aktivität umfasst die vollständige SQL-Anweisung, die Zeilenzahl der betroffenen Zeilen aus DML-Befehlen, Objekte, auf die zugegriffen wurde, und den eindeutigen Datenbanknamen. Für Aurora PostgreSQL überwachen Datenbankaktivitäts-Streams auch die Bindevariablen und die Parameter der gespeicherten Prozedur.

    Wichtig

    Der vollständige SQL-Text jeder Anweisung ist im Prüfprotokoll des Aktivitäts-Streams sichtbar, inklusive aller sensiblen Daten. Datenbankbenutzerkennwörter werden jedoch redigiert, wenn Aurora sie wie in der folgenden SQL-Anweisung aus dem Kontext ermitteln kann.

    ALTER ROLE role-name WITH password
  • Verbindungsinformationen – Die überwachte Aktivität umfasst Sitzungs- und Netzwerkinformationen, die Server-Prozess-ID und Beendigungscodes.

Wenn ein Aktivitätsstream während der Überwachung Ihrer DB-Instance fehlschlägt, werden Sie über RDS-Ereignisse benachrichtigt.

Zugriff auf einen Aktivitäts-Stream von Kinesis aus

Wenn Sie einen Aktivitäts-Stream für einen DB-Cluster aktivieren, wird ein Kinesis-Stream für Sie erstellt. Von Kinesis aus können Sie die Datenbankaktivität in Echtzeit überwachen. Zur weiteren Analyse der Datenbankaktivität können Sie Ihren Kinesis-Stream mit Consumer-Anwendungen verbinden. Sie können den Stream auch mit Compliance-Management-Anwendungen wie IBM Security Guardium oder Imperva SecureSphere Database Audit and Protection verbinden.

Sie können entweder über die RDS- oder Kinesis-Konsole auf Ihren Kinesis-Stream zugreifen.

So greifen Sie über die RDS-Konsole auf einen Aktivitätsstream zu
  1. Öffnen Sie die Amazon-RDS-Konsole unter https://console.aws.amazon.com/rds/.

  2. Wählen Sie im Navigationsbereich Datenbanken aus.

  3. Wählen Sie den/die DB-Cluster aus, auf der Sie einen Aktivitätsstream gestartet haben.

  4. Wählen Sie Konfiguration.

  5. Wählen Sie unter Database activity stream (Datenbank-Aktivitätsstream) den Link unter Kinesis stream (Kinesis-Stream) aus.

  6. Wählen Sie in der Kinesis-Konsole Monitoring (Überwachung) aus, um mit der Überwachung der Datenbankaktivität zu beginnen.

So greifen Sie über die Kinesis-Konsole auf einen Aktivitätsstream von Kinesis zu
  1. Öffnen Sie die Kinesis-Konsole unter.https://console.aws.amazon.com/kinesis.

  2. Wählen Sie Ihren Aktivitäts-Stream aus der Liste der Kinesis-Streams aus.

    Der Name eines Aktivitäts-Streams besteht aus dem Präfix aws-rds-das-cluster- gefolgt von der Ressourcen-ID des DB-Clusters. Im Folgenden wird ein Beispiel gezeigt.

    aws-rds-das-cluster-NHVOV4PCLWHGF52NP

    Um die Amazon-RDS-Konsole zum Ermitteln der Ressourcen-ID für den DB-Cluster zu verwenden, wählen Sie Ihren DB-Cluster aus der Liste der Datenbanken aus und wählen dann die Registerkarte Konfiguration aus.

    Um den vollständigen Kinesis-Stream-Namen für einen Aktivitäts-Stream mit AWS CLI der zu finden, verwenden Sie eine describe-db-clusters CLI-Anfrage und notieren Sie sich den Wert von ActivityStreamKinesisStreamName in der Antwort.

  3. Wählen Sie Monitoring (Überwachung) aus, um mit der Überwachung der Datenbankaktivität zu beginnen.

Weitere Informationen zur Verwendung von Amazon Kinesis finden Sie unter Was sind Amazon Kinesis Data Streams?.

Prüfungsprotokoll – Inhalte und Beispiele

Überwachte Ereignisse werden im Datenbankaktivitätsstream als JSON-Zeichenfolgen dargestellt. Die Struktur besteht aus einem JSON-Objekt mit einem DatabaseActivityMonitoringRecord, der wiederum ein Array von Aktivitätsereignissen databaseActivityEventList enthält.

Prüfungsprotokollbeispiele für Aktivitäts-Streams

Im Folgenden sehen Sie Beispiele für entschlüsselte JSON-Prüfprotokolle von Aktivitätsereignisdatensätzen.

Beispiel Aktivitätsereignisdatensatz einer Aurora-PostgreSQL-CONNECT SQL-Anweisung

Im Folgenden sehen Sie einen Aktivitätsereignisdatensatz einer Anmeldung unter Verwendung einer CONNECT-SQL-Anweisung (command) durch einen psql-Client (clientApplication).

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-10-30 00:39:49.940668+00", "logTime": "2019-10-30 00:39:49.990579+00", "statementId": 1, "substatementId": 1, "objectType": null, "command": "CONNECT", "objectName": null, "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "49804", "sessionId": "5ce5f7f0.474b", "rowCount": null, "commandText": null, "paramList": [], "pid": 18251, "clientApplication": "psql", "exitCode": null, "class": "MISC", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }
Beispiel Aktivitätsereignisdatensatz einer Aurora MySQL-CONNECT SQL-Anweisung

Im Folgenden sehen Sie einen Aktivitätsereignisdatensatz einer Anmeldung unter Verwendung einer CONNECT-SQL-Anweisung (command) durch einen mysql-Client (clientApplication).

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:13.267214+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"rdsadmin", "databaseName":"", "remoteHost":"localhost", "remotePort":"11053", "command":"CONNECT", "commandText":"", "paramList":null, "objectType":"TABLE", "objectName":"", "statementId":0, "substatementId":1, "exitCode":"0", "sessionId":"725121", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:13.267207+00", "endTime":"2020-05-22 18:07:13.267213+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }
Beispiel Aktivitätsereignisdatensatz einer Aurora PostgreSQL CREATE TABLE-Anweisung

Im Folgenden sehen Sie ein Beispiel eines CREATE TABLE-Ereignisses für Aurora PostgreSQL.

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-05-24 00:36:54.403455+00", "logTime": "2019-05-24 00:36:54.494235+00", "statementId": 2, "substatementId": 1, "objectType": null, "command": "CREATE TABLE", "objectName": null, "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "34534", "sessionId": "5ce73c6f.7e64", "rowCount": null, "commandText": "create table my_table (id serial primary key, name varchar(32));", "paramList": [], "pid": 32356, "clientApplication": "psql", "exitCode": null, "class": "DDL", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }
Beispiel Aktivitätsereignisdatensatz einer Aurora-MySQL-CREATE TABLE-Anweisung

Das folgende Beispiel zeigt eine CREATE TABLE-Anweisung für Aurora MySQL. Die Operation wird als zwei separate Ereignisdatensätze dargestellt. Das eine Ereignis verfügt über einen Wert "class":"MAIN". Das andere über einen Wert "class":"AUX". Die Nachrichten können in beliebiger Reihenfolge eintreffen. Das logTime-Feld des MAIN-Ereignisses ist immer früher als die logTime-Felder der entsprechenden AUX-Ereignisse.

Im folgenden Beispiel wird das Ereignis mit einem class-Wert von MAIN gezeigt.

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:12.250221+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"QUERY", "commandText":"CREATE TABLE test1 (id INT)", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65459278, "substatementId":1, "exitCode":"0", "sessionId":"725118", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:12.226384+00", "endTime":"2020-05-22 18:07:12.250222+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }

Im folgenden Beispiel wird das entsprechende Ereignis mit einem class-Wert von AUX gezeigt.

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:12.247182+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"CREATE", "commandText":"test1", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65459278, "substatementId":2, "exitCode":"", "sessionId":"725118", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:12.226384+00", "endTime":"2020-05-22 18:07:12.247182+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"AUX" } ] }
Beispiel Aktivitätsereignisdatensatz einer Aurora PostgreSQL SELECT-Anweisung

Das folgende Beispiel zeigt ein SELECT-Ereignis .

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-05-24 00:39:49.920564+00", "logTime": "2019-05-24 00:39:49.940668+00", "statementId": 6, "substatementId": 1, "objectType": "TABLE", "command": "SELECT", "objectName": "public.my_table", "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "34534", "sessionId": "5ce73c6f.7e64", "rowCount": 10, "commandText": "select * from my_table;", "paramList": [], "pid": 32356, "clientApplication": "psql", "exitCode": null, "class": "READ", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }
{ "type": "DatabaseActivityMonitoringRecord", "clusterId": "", "instanceId": "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q", "databaseActivityEventList": [ { "class": "TABLE", "clientApplication": "Microsoft SQL Server Management Studio - Query", "command": "SELECT", "commandText": "select * from [testDB].[dbo].[TestTable]", "databaseName": "testDB", "dbProtocol": "SQLSERVER", "dbUserName": "test", "endTime": null, "errorMessage": null, "exitCode": 1, "logTime": "2022-10-06 21:24:59.9422268+00", "netProtocol": null, "objectName": "TestTable", "objectType": "TABLE", "paramList": null, "pid": null, "remoteHost": "local machine", "remotePort": null, "rowCount": 0, "serverHost": "172.31.30.159", "serverType": "SQLSERVER", "serverVersion": "15.00.4073.23.v1.R1", "serviceName": "sqlserver-ee", "sessionId": 62, "startTime": null, "statementId": "0x03baed90412f564fad640ebe51f89b99", "substatementId": 1, "transactionId": "4532935", "type": "record", "engineNativeAuditFields": { "target_database_principal_id": 0, "target_server_principal_id": 0, "target_database_principal_name": "", "server_principal_id": 2, "user_defined_information": "", "response_rows": 0, "database_principal_name": "dbo", "target_server_principal_name": "", "schema_name": "dbo", "is_column_permission": true, "object_id": 581577110, "server_instance_name": "EC2AMAZ-NFUJJNO", "target_server_principal_sid": null, "additional_information": "", "duration_milliseconds": 0, "permission_bitmask": "0x00000000000000000000000000000001", "data_sensitivity_information": "", "session_server_principal_name": "test", "connection_id": "AD3A5084-FB83-45C1-8334-E923459A8109", "audit_schema_version": 1, "database_principal_id": 1, "server_principal_sid": "0x010500000000000515000000bdc2795e2d0717901ba6998cf4010000", "user_defined_event_id": 0, "host_name": "EC2AMAZ-NFUJJNO" } } ] }
Beispiel Aktivitätsereignisdatensatz einer Aurora MySQL-SELECT-Anweisung

Das folgende Beispiel zeigt ein SELECT-Ereignis.

Im folgenden Beispiel wird das Ereignis mit einem class-Wert von MAIN gezeigt.

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:29:57.986467+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"QUERY", "commandText":"SELECT * FROM test1 WHERE id < 28", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65469218, "substatementId":1, "exitCode":"0", "sessionId":"726571", "rowCount":2, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:29:57.986364+00", "endTime":"2020-05-22 18:29:57.986467+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }

Im folgenden Beispiel wird das entsprechende Ereignis mit einem class-Wert von AUX gezeigt.

{ "type":"DatabaseActivityMonitoringRecord", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:29:57.986399+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"READ", "commandText":"test1", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65469218, "substatementId":2, "exitCode":"", "sessionId":"726571", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:29:57.986364+00", "endTime":"2020-05-22 18:29:57.986399+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"AUX" } ] }

DatabaseActivityMonitoringRecords JSON-Objekt

Die Datenbank-Aktivitätsereignisdatensätze befinden sich in einem JSON-Objekt, das die folgenden Informationen enthält.

JSON-Feld Datentyp Beschreibung

type

string

Der Typ des JSON-Datensatzes. Der Wert ist DatabaseActivityMonitoringRecords.

version string Die Version der Datenbank-Aktivitätsüberwachungsdatensätze.

Die Version der generierten Datenbank-Aktivitätsdatensätze hängt von der Engine-Version des DB-Clusters ab:

  • Datenbank-Aktivitätsdatensätze der Version 1.1 werden für Aurora PostgreSQL-DB-Cluster generiert, auf denen die Engine-Versionen 10.10 und höhere Nebenversionen sowie die Engine-Versionen 11.5 und höher ausgeführt werden.

  • Datenbank-Aktivitätsdatensätze der Version 1.0 werden für Aurora PostgreSQL-DB-Cluster generiert, auf denen die Engine-Versionen 10.7 und 11.4 ausgeführt werden.

Alle folgenden Felder befinden sich sowohl in Version 1.0 als auch in Version 1.1, sofern nicht ausdrücklich angegeben.

databaseActivityEvents

Zeichenfolge

Ein JSON-Objekt, das die Aktivitätsereignisse enthält.

Schlüssel Zeichenfolge Ein Verschlüsselungsschlüssel, den Sie zum Entschlüsseln des databaseActivityEventListe verwenden

databaseActivityEvents JSON-Objekt

Das databaseActivityEvents-JSON-Objekt enthält die folgenden Informationen.

Felder der obersten Ebene im JSON-Datensatz

Jedes Ereignis im Prüfprotokoll wird in einen Datensatz im JSON-Format verpackt. Dieser Datensatz enthält die folgenden Felder.

type

Dieses Feld hat immer den Wert DatabaseActivityMonitoringRecords.

Version

Dieses Feld stellt die Version des Datenprotokolls oder des Vertrags für die Datenbankaktivität dar. Es definiert, welche Felder verfügbar sind.

Version 1.0 stellt die Unterstützung der ursprünglichen Datenaktivitäts-Streams für die Aurora PostgreSQL-Versionen 10.7 und 11.4 dar. Version 1.1 stellt die Unterstützung der Datenaktivitäts-Streams für die Aurora PostgreSQL-Versionen ab 10.10 und ab Aurora PostgreSQL-Version 11.5 dar. Version 1.1 enthält die zusätzlichen Felder errorMessage und startTime. Version 1.2 stellt die Unterstützung der Datenaktivitäts-Streams für Aurora MySQL 2.08 und höher dar. Version 1.2 enthält die zusätzlichen Felder endTime und transactionId.

databaseActivityEvents

Eine verschlüsselte Zeichenfolge, die ein oder mehrere Aktivitätsereignisse darstellt. Sie wird als Base64-Byte-Array dargestellt. Wenn Sie die Zeichenfolge entschlüsseln, ist das Ergebnis ein Datensatz im JSON-Format mit Feldern, wie in den Beispielen in diesem Abschnitt gezeigt.

Schlüssel

Der verschlüsselte Datenschlüssel, der zum Verschlüsseln der databaseActivityEvents-Zeichenfolge verwendet wird. Dies ist dieselbe AWS KMS key , die Sie beim Starten des Datenbankaktivitäts-Streams angegeben haben.

Im folgenden Beispiel wird das Format dieses Datensatzes gezeigt.

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents":"encrypted audit records", "key":"encrypted key" }

Führen Sie die folgenden Schritte aus, um den Inhalt des databaseActivityEvents-Feldes zu entschlüsseln:

  1. Entschlüsseln Sie den Wert im JSON-Feld key mit dem KMS-Schlüssel, den Sie beim Starten des Datenbankaktivitätsstroms angegeben haben. Dadurch wird der Datenverschlüsselungsschlüssel im Klartext zurückgegeben.

  2. Base64-dekodieren Sie den Wert im databaseActivityEvents-JSON-Feld, um den Verschlüsselungstext der Prüfungsnutzlast im Binärformat zu erhalten.

  3. Entschlüsseln Sie den binären Verschlüsselungstext mit dem Datenverschlüsselungsschlüssel, den Sie im ersten Schritt dekodiert haben.

  4. Dekomprimieren Sie die entschlüsselte Nutzlast.

    • Die verschlüsselte Nutzlast befindet sich im databaseActivityEvents-Feld.

    • Das databaseActivityEventList-Feld enthält ein Array von Prüfdatensätzen. Die type-Felder im Array können record oder sein heartbeat.

Der Prüfprotokoll-Aktivitätsereignisdatensatz ist ein JSON-Objekt mit folgenden Informationen.

JSON-Feld Datentyp Beschreibung

type

string

Der Typ des JSON-Datensatzes. Der Wert ist DatabaseActivityMonitoringRecord.

clusterId string Die Ressourcen-ID des DB-Clusters. Sie entspricht dem DB-Clusterattribut DbClusterResourceId.
instanceId string Die Ressourcen-ID der DB-Instance. Sie dem DB-Instance-Attribut DbiResourceId.

databaseActivityEventListe

string

Ein Array von Aktivitätsprüfdatensätzen oder Heartbeat-Nachrichten.

databaseActivityEventJSON-Array auflisten

Die Prüfprotokollnutzlast ist ein verschlüsseltes JSON-Array databaseActivityEventList. In der folgenden Tabelle sind die Felder für jedes Aktivitätsereignis im entschlüsselten Array DatabaseActivityEventList eines Prüfprotokolls alphabetisch aufgelistet. Die Felder unterscheiden sich je nachdem, ob Sie Aurora PostgreSQL oder Aurora MySQL verwenden. Näheres entnehmen Sie bitte der Tabelle, die für Ihre Datenbank-Engine gilt.

Wichtig

Die Ereignisstruktur kann sich ändern. Aurora könnte in Zukunft neue Felder zu Aktivitätsereignissen hinzufügen. Stellen Sie bei Anwendungen, welche die JSON-Daten analysieren, sicher, dass Ihr Code unbekannte Feldnamen ignorieren oder entsprechende Aktionen durchführen kann.

databaseActivityEventAuflisten von Feldern für Aurora PostgreSQL
Feld Datentyp Beschreibung
class string

Die Aktivitätsereignisklasse. Gültige Werte für Aurora PostgreSQL sind die folgenden:

  • ALL

  • CONNECT – Ein Verbindungs- oder Verbindungstrennungsereignis.

  • DDL – eine DDL-Anweisung, die nicht in der Liste der Anweisungen für die Klasse ROLE enthalten ist.

  • FUNCTION – ein Funktionsaufruf oder ein DO-Block.

  • MISC – ein sonstiger Befehl wie z. B. DISCARD, FETCH, CHECKPOINT oder VACUUM.

  • NONE

  • READ – eine Anweisung SELECT oder COPY, wenn es sich bei der Quelle um eine Relation oder Abfrage handelt.

  • ROLE – eine Anweisung in Zusammenhang mit Rollen und Berechtigungen wie z. B. GRANT, REVOKE und CREATE/ALTER/DROP ROLE.

  • WRITE – eine Anweisung INSERT, UPDATE, DELETE, TRUNCATE, oder COPY, wenn das Ziel eine Relation ist.

clientApplication string Die Anwendung, die der Client laut Meldung für die Verbindung verwendet hat. Der Client muss diese Informationen nicht angeben, der Wert kann daher Null sein.
command string Der Name des SQL-Befehls ohne Befehlsdetails.
commandText string

Die vom Benutzer übergebene eigentliche SQL-Anweisung. Bei Aurora PostgreSQL ist der Wert identisch mit der ursprünglichen SQL-Anweisung. Dieses Feld wird für alle Arten von Datensätzen verwendet, mit Ausnahme von Verbindungs- oder Verbindungstrennungsdatensätzen, bei denen der Wert Null ist.

Wichtig

Der vollständige SQL-Text jeder Anweisung ist im Prüfprotokoll des Aktivitäts-Streams sichtbar, inklusive aller sensiblen Daten. Datenbankbenutzerkennwörter werden jedoch redigiert, wenn Aurora sie wie in der folgenden SQL-Anweisung aus dem Kontext ermitteln kann.

ALTER ROLE role-name WITH password
databaseName string Die Datenbank, zu der der Benutzer eine Verbindung hergestellt hat.
dbProtocol string Das Datenbankprotokoll, z. B. Postgres 3.0.
dbUserName string Der Datenbankbenutzer, mit dem sich der Client authentifiziert hat.
errorMessage

(nur Datenbank-Aktivitätsdatensätze der Version 1.1)

string

Wenn ein Fehler aufgetreten ist, wird dieses Feld mit der Fehlermeldung gefüllt, die vom DB-Server generiert worden wäre. Der errorMessage-Wert ist null für normale Anweisungen, die nicht zu einem Fehler geführt haben.

Ein Fehler wird als jede Aktivität definiert, die ein vom Client sichtbares PostgreSQL-Fehlerprotokollereignis mit einem Schweregrad von ERROR oder höher erzeugen würde. Weitere Informationen finden Sie unter PostgreSQL-Nachrichtenschweregrade. Beispielsweise erzeugen Syntaxfehler und Abfrageabbrüche eine Fehlermeldung.

Interne PostgreSQL-Serverfehler wie Hintergrund-Checkpointer-Prozessfehler erzeugen keine Fehlermeldung. Datensätze für solche Ereignisse werden jedoch weiterhin ausgegeben, unabhängig von der Einstellung des Schweregrads des Protokolls. Dadurch wird verhindert, dass Angreifer die Protokollierung deaktivieren, um eine Erkennung zu vermeiden.

Siehe auch das Feld exitCode.

exitCode int Ein Wert, der für einen Sitzungsbeendigungs-Datensatz verwendet wird. Bei einer sauberen Beendigung ist hier der Beendigungscode enthalten. In manchen Fehlersituationen kann nicht immer ein Beendigungscode erhalten werden. Beispiele: exit() von PostgreSQL oder Ausführung eines Befehls wie kill -9 durch einen Operator.

Wenn ein Fehler aufgetreten ist, zeigt das exitCode-Feld den SQL-Fehlercode SQLSTATE an, wie in PostgreSQL-Fehlercodes aufgeführt.

Siehe auch das Feld errorMessage.

logTime string Ein Zeitstempel wie im Prüfcodepfad aufgezeichnet. Dies stellt die Endzeit der SQL-Anweisungsausführung dar. Siehe auch das Feld startTime.
netProtocol string Das Netzwerkkommunikationsprotokoll.
objectName string Der Name des Datenbankobjekts, wenn die SQL-Anweisung für eines ausgeführt wird. Dieses Feld wird nur verwendet, wenn die SQL-Anweisung für ein Datenbankobjekt ausgeführt wird. Falls die SQL-Anweisung nicht für ein Objekt ausgeführt wird, lautet dieser Wert Null.
objectType string Der Datenbankobjekttyp wie z. B. Tabelle, Index, Ansicht usw. Dieses Feld wird nur verwendet, wenn die SQL-Anweisung für ein Datenbankobjekt ausgeführt wird. Falls die SQL-Anweisung nicht für ein Objekt ausgeführt wird, lautet dieser Wert Null. Gültige Werte sind unter anderem:
  • COMPOSITE TYPE

  • FOREIGN TABLE

  • FUNCTION

  • INDEX

  • MATERIALIZED VIEW

  • SEQUENCE

  • TABLE

  • TOAST TABLE

  • VIEW

  • UNKNOWN

paramList string Ein Array durch Kommas getrennter Parameter, die an die SQL-Anweisung übergeben werden. Wenn die SQL-Anweisung keine Parameter beinhaltet, ist dieser Wert ein leeres Array.
pid int Die Prozess-ID des Back-End-Prozesses, der für die Client-Verbindung zugewiesen wird.
remoteHost string Entweder die Client-IP-Adresse oder der Hostname. Was davon verwendet wird, ist bei Aurora PostgreSQL von der Parametereinstellung log_hostname der Datenbank abhängig.
remotePort string Die Portnummer des Clients.
rowCount int Die Anzahl der Zeilen, die von der SQL-Anweisung zurückgegeben werden. Wenn eine SELECT-Anweisung beispielsweise 10 Zeilen zurückgibt, beträgt rowCount 10. Für INSERT- oder UPDATE-Anweisungen ist der RowCount 0.
serverHost string Die Host-IP-Adresse des Datenbankservers.
serverType string Der Datenbankservertyp, z. B PostgreSQL.
serverVersion string Die Datenbankserver-Version, z. B. 2.3.1 für Aurora PostgreSQL.
serviceName string Der Name des Service, beispielsweise Amazon Aurora PostgreSQL-Compatible edition.
sessionId int Eine pseudoeindeutige Sitzungskennung.
sessionId int Eine pseudoeindeutige Sitzungskennung.
startTime

(nur Datenbank-Aktivitätsdatensätze der Version 1.1)

string

Die Zeit, zu der die Ausführung für die SQL-Anweisung begann.

Um die ungefähre Ausführungszeit der SQL-Anweisung zu berechnen, verwenden Sie logTime - startTime. Siehe auch das Feld logTime.

statementId int Eine ID für die SQL-Anweisung des Clients. Dieser Zähler auf Sitzungsebene erhöht sich mit jeder vom Client eingegebenen SQL-Anweisung.
substatementId int Eine ID für eine SQL-Unteranweisung. Dieser Wert zählt die enthaltenen Unteranweisungen für jede über das Feld statementId angegebene SQL-Anweisung.
type string Der Ereignistyp. Gültige Werte sind record oder heartbeat.
databaseActivityEventAuflisten von Feldern für Aurora MySQL
Feld Datentyp Beschreibung
class string

Die Aktivitätsereignisklasse.

Gültige Werte für Aurora MySQL sind die folgenden:

  • MAIN – das primäre Ereignis, das eine SQL-Anweisung darstellt.

  • AUX – ein zusätzliches Ereignis, das zusätzliche Details enthält. Beispielsweise kann eine Anweisung, mit der ein Objekt umbenannt wird, ein Ereignis der Klasse AUX aufweisen, das den neuen Namen wiedergibt.

    Um MAIN- und AUX-Ereignisse zu finden, die derselben Anweisung entsprechen, suchen Sie nach verschiedenen Ereignissen, die dieselben Werte für das Feld pid und für das Feld statementId aufweisen.

clientApplication string Die Anwendung, die der Client laut Meldung für die Verbindung verwendet hat. Der Client muss diese Informationen nicht angeben, der Wert kann daher Null sein.
command string

Die allgemeine Kategorie der SQL-Anweisung. Die Werte für dieses Feld hängen vom Wert von a class.

Wenn class MAIN ist, enthalten die Werte Folgendes:

  • CONNECT – wenn eine Client-Sitzung verbunden ist.

  • QUERY – eine SQL-Anweisung. Ebenfalls enthalten sind ein oder mehrere Ereignisse mit einem class-Wert von AUX.

  • DISCONNECT – wenn eine Client-Sitzung getrennt wird.

  • FAILED_CONNECT – wenn ein Client versucht, eine Verbindung herzustellen, dies aber nicht möglich ist.

  • CHANGEUSER – eine Statusänderung, die Teil des MySQL-Netzwerkprotokolls ist und nicht aus einer von Ihnen ausgegebenen Anweisung stammt.

Wenn class AUX ist, enthalten die Werte Folgendes:

  • READ – eine Anweisung SELECT oder COPY, wenn es sich bei der Quelle um eine Relation oder Abfrage handelt.

  • WRITE – eine Anweisung INSERT, UPDATE, DELETE, TRUNCATE, oder COPY, wenn das Ziel eine Relation ist.

  • DROP – Löschen eines Objekts

  • CREATE – Erstellen eines Objekts.

  • RENAME – Umbenennen eines Objekts.

  • ALTER – Ändern der Eigenschaften eines Objekts.

commandText string

Bei Ereignissen mit dem class-Wert von MAIN stellt dieses Feld die tatsächliche, vom Benutzer eingegebene SQL-Anweisung dar. Dieses Feld wird für alle Arten von Datensätzen verwendet, mit Ausnahme von Verbindungs- oder Verbindungstrennungsdatensätzen, bei denen der Wert Null ist.

Bei Ereignissen mit einem class-Wert von AUX enthält dieses Feld zusätzliche Informationen über die am Ereignis beteiligten Objekte.

Bei Aurora MySQL wird Zeichen, z. B. Anführungszeichen, ein Backslash vorangestellt, der ein Escape-Zeichen darstellt.

Wichtig

Der vollständige SQL-Text jeder Anweisung ist im Prüfprotokoll sichtbar, inklusive aller sensiblen Daten. Datenbankbenutzerkennwörter werden jedoch redigiert, wenn Aurora sie wie in der folgenden SQL-Anweisung aus dem Kontext ermitteln kann.

mysql> SET PASSWORD = 'my-password';
Anmerkung

Geben Sie aus Sicherheitsgründen ein anderes Passwort als hier angegeben an.

databaseName Zeichenfolge Die Datenbank, zu der der Benutzer eine Verbindung hergestellt hat.
dbProtocol string Das Datenbankprotokoll. Derzeit ist dieser Wert bei Aurora MySQL immer MySQL.
dbUserName string Der Datenbankbenutzer, mit dem sich der Client authentifiziert hat.
endTime

(nur Datenbank-Aktivitätsdatensätze der Version 1.2)

string

Die Zeit, zu der die Ausführung für die SQL-Anweisung endete. Sie wird im UTC-Format (Coordinated Universal Time) dargestellt.

Um die Ausführungszeit der SQL-Anweisung zu berechnen, verwenden Sie endTime - startTime. Siehe auch das Feld startTime.

errorMessage

(nur Datenbank-Aktivitätsdatensätze der Version 1.1)

string

Wenn ein Fehler aufgetreten ist, wird dieses Feld mit der Fehlermeldung gefüllt, die vom DB-Server generiert worden wäre. Der errorMessage-Wert ist null für normale Anweisungen, die nicht zu einem Fehler geführt haben.

Ein Fehler wird als jede Aktivität definiert, die ein vom Client sichtbares MySQL-Fehlerprotokollereignis mit einem Schweregrad von ERROR oder höher erzeugen würde. Weitere Informationen finden Sie unter Fehlerprotokoll im MySQL-Referenzhandbuch. Beispielsweise erzeugen Syntaxfehler und Abfrageabbrüche eine Fehlermeldung.

Interne MySQL-Serverfehler wie Hintergrund-Checkpointer-Prozessfehler erzeugen keine Fehlermeldung. Datensätze für solche Ereignisse werden jedoch weiterhin ausgegeben, unabhängig von der Einstellung des Schweregrads des Protokolls. Dadurch wird verhindert, dass Angreifer die Protokollierung deaktivieren, um eine Erkennung zu vermeiden.

Siehe auch das Feld exitCode.

exitCode int Ein Wert, der für einen Sitzungsbeendigungs-Datensatz verwendet wird. Bei einer sauberen Beendigung ist hier der Beendigungscode enthalten. In manchen Fehlersituationen kann nicht immer ein Beendigungscode erhalten werden. In solchen Fällen kann dieser Wert Null oder leer sein.
logTime string Ein Zeitstempel wie im Prüfcodepfad aufgezeichnet. Sie wird im UTC-Format (Coordinated Universal Time) dargestellt. Die genaueste Methode zum Berechnen der Anweisungsdauer finden Sie in den Feldern startTime und endTime.
netProtocol string Das Netzwerkkommunikationsprotokoll. Derzeit ist dieser Wert bei Aurora MySQL immer TCP.
objectName string Der Name des Datenbankobjekts, wenn die SQL-Anweisung für eines ausgeführt wird. Dieses Feld wird nur verwendet, wenn die SQL-Anweisung für ein Datenbankobjekt ausgeführt wird. Falls die SQL-Anweisung nicht für ein Objekt ausgeführt wird, ist dieser Wert leer. Um den vollständig qualifizierten Namen des Objekts zu erstellen, kombinieren Sie databaseName und objectName. Wenn die Abfrage mehrere Objekte umfasst, kann dieses Feld eine durch Komma getrennte Liste von Namen sein.
objectType string

Der Datenbankobjekttyp, z. B. Tabelle, Index usw. Dieses Feld wird nur verwendet, wenn die SQL-Anweisung für ein Datenbankobjekt ausgeführt wird. Falls die SQL-Anweisung nicht für ein Objekt ausgeführt wird, lautet dieser Wert Null.

Gültige Werte für Aurora MySQL sind unter anderem:

  • INDEX

  • TABLE

  • UNKNOWN

paramList string Dieses Feld wird für Aurora MySQL nicht verwendet und ist immer Null.
pid int Die Prozess-ID des Back-End-Prozesses, der für die Client-Verbindung zugewiesen wird. Wenn der Datenbankserver neu gestartet wird, ändert sich die pid und der Zähler für das Feld statementId beginnt von vorn.
remoteHost string Entweder die IP-Adresse oder der Hostname des Clients, der die SQL-Anweisung ausgegeben hat. Was davon verwendet wird, ist bei Aurora MySQL von der Parametereinstellung skip_name_resolve der Datenbank abhängig. Der Wert localhost gibt die Aktivität des speziellen Benutzers rdsadmin an.
remotePort string Die Portnummer des Clients.
rowCount int Die Anzahl der Tabellenzeilen, die von der SQL-Anweisung betroffen sind bzw. abgerufen werden. Dieses Feld wird nur für SQL-Anweisungen verwendet, bei denen es sich um DML-Anweisungen (DML = Data Manipulation Language) handelt. Falls die SQL-Anweisung keine DML-Anweisung ist, lautet dieser Wert Null.
serverHost string Die Datenbankserver-Instance-ID. Dieser Wert wird bei Aurora MySQL anders dargestellt als bei Aurora PostgreSQL. Aurora PostgreSQL verwendet eine IP-Adresse anstelle einer ID.
serverType string Der Datenbankservertyp, z. B MySQL.
serverVersion string Die Version des Datenbankservers. Derzeit ist dieser Wert bei Aurora MySQL immer MySQL 5.7.12.
serviceName string Name des Service. Derzeit ist dieser Wert bei Aurora MySQL immer Amazon Aurora MySQL.
sessionId int Eine pseudoeindeutige Sitzungskennung.
startTime

(nur Datenbank-Aktivitätsdatensätze der Version 1.1)

string

Die Zeit, zu der die Ausführung für die SQL-Anweisung begann. Sie wird im UTC-Format (Coordinated Universal Time) dargestellt.

Um die Ausführungszeit der SQL-Anweisung zu berechnen, verwenden Sie endTime - startTime. Siehe auch das Feld endTime.

statementId int Eine ID für die SQL-Anweisung des Clients. Der Zähler erhöht sich mit jeder vom Client eingegebenen SQL-Anweisung. Der Zähler wird zurückgesetzt, wenn die DB-Instance neu gestartet wird.
substatementId int Eine ID für eine SQL-Unteranweisung. Dieser Wert ist 1 für Ereignisse mit der Klasse MAIN und 2 für Ereignisse mit der Klasse AUX. Verwenden Sie das statementId-Feld, um alle Ereignisse zu identifizieren, die von derselben Anweisung generiert werden.
transactionId

(nur Datenbank-Aktivitätsdatensätze der Version 1.2)

int Eine ID für eine Transaktion.
type string Der Ereignistyp. Gültige Werte sind record oder heartbeat.

Verarbeiten eines Datenbankaktivitäts-Streams mit dem AWS SDK

Sie können einen Aktivitätsstream programmgesteuert verarbeiten, indem Sie das AWS SDK verwenden. Im Folgenden sehen Sie vollständig funktionsfähige Java- und Python-Beispiele für eine mögliche Verarbeitung des Kinesis-Datenstroms.

Java
import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.security.NoSuchAlgorithmException; import java.security.NoSuchProviderException; import java.security.Security; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.zip.GZIPInputStream; import javax.crypto.Cipher; import javax.crypto.NoSuchPaddingException; import javax.crypto.spec.SecretKeySpec; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.encryptionsdk.AwsCrypto; import com.amazonaws.encryptionsdk.CryptoInputStream; import com.amazonaws.encryptionsdk.jce.JceMasterKey; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kms.AWSKMS; import com.amazonaws.services.kms.AWSKMSClientBuilder; import com.amazonaws.services.kms.model.DecryptRequest; import com.amazonaws.services.kms.model.DecryptResult; import com.amazonaws.util.Base64; import com.amazonaws.util.IOUtils; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; import org.bouncycastle.jce.provider.BouncyCastleProvider; public class DemoConsumer { private static final String STREAM_NAME = "aws-rds-das-[cluster-external-resource-id]"; private static final String APPLICATION_NAME = "AnyApplication"; //unique application name for dynamo table generation that holds kinesis shard tracking private static final String AWS_ACCESS_KEY = "[AWS_ACCESS_KEY_TO_ACCESS_KINESIS]"; private static final String AWS_SECRET_KEY = "[AWS_SECRET_KEY_TO_ACCESS_KINESIS]"; private static final String DBC_RESOURCE_ID = "[cluster-external-resource-id]"; private static final String REGION_NAME = "[region-name]"; //us-east-1, us-east-2... private static final BasicAWSCredentials CREDENTIALS = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY); private static final AWSStaticCredentialsProvider CREDENTIALS_PROVIDER = new AWSStaticCredentialsProvider(CREDENTIALS); private static final AwsCrypto CRYPTO = new AwsCrypto(); private static final AWSKMS KMS = AWSKMSClientBuilder.standard() .withRegion(REGION_NAME) .withCredentials(CREDENTIALS_PROVIDER).build(); class Activity { String type; String version; String databaseActivityEvents; String key; } class ActivityEvent { @SerializedName("class") String _class; String clientApplication; String command; String commandText; String databaseName; String dbProtocol; String dbUserName; String endTime; String errorMessage; String exitCode; String logTime; String netProtocol; String objectName; String objectType; List<String> paramList; String pid; String remoteHost; String remotePort; String rowCount; String serverHost; String serverType; String serverVersion; String serviceName; String sessionId; String startTime; String statementId; String substatementId; String transactionId; String type; } class ActivityRecords { String type; String clusterId; String instanceId; List<ActivityEvent> databaseActivityEventList; } static class RecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new RecordProcessor(); } } static class RecordProcessor implements IRecordProcessor { private static final long BACKOFF_TIME_IN_MILLIS = 3000L; private static final int PROCESSING_RETRIES_MAX = 10; private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L; private static final Gson GSON = new GsonBuilder().serializeNulls().create(); private static final Cipher CIPHER; static { Security.insertProviderAt(new BouncyCastleProvider(), 1); try { CIPHER = Cipher.getInstance("AES/GCM/NoPadding", "BC"); } catch (NoSuchAlgorithmException | NoSuchPaddingException | NoSuchProviderException e) { throw new ExceptionInInitializerError(e); } } private long nextCheckpointTimeInMillis; @Override public void initialize(String shardId) { } @Override public void processRecords(final List<Record> records, final IRecordProcessorCheckpointer checkpointer) { for (final Record record : records) { processSingleBlob(record.getData()); } if (System.currentTimeMillis() > nextCheckpointTimeInMillis) { checkpoint(checkpointer); nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS; } } @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } } private void processSingleBlob(final ByteBuffer bytes) { try { // JSON $Activity final Activity activity = GSON.fromJson(new String(bytes.array(), StandardCharsets.UTF_8), Activity.class); // Base64.Decode final byte[] decoded = Base64.decode(activity.databaseActivityEvents); final byte[] decodedDataKey = Base64.decode(activity.key); Map<String, String> context = new HashMap<>(); context.put("aws:rds:dbc-id", DBC_RESOURCE_ID); // Decrypt final DecryptRequest decryptRequest = new DecryptRequest() .withCiphertextBlob(ByteBuffer.wrap(decodedDataKey)).withEncryptionContext(context); final DecryptResult decryptResult = KMS.decrypt(decryptRequest); final byte[] decrypted = decrypt(decoded, getByteArray(decryptResult.getPlaintext())); // GZip Decompress final byte[] decompressed = decompress(decrypted); // JSON $ActivityRecords final ActivityRecords activityRecords = GSON.fromJson(new String(decompressed, StandardCharsets.UTF_8), ActivityRecords.class); // Iterate throught $ActivityEvents for (final ActivityEvent event : activityRecords.databaseActivityEventList) { System.out.println(GSON.toJson(event)); } } catch (Exception e) { // Handle error. e.printStackTrace(); } } private static byte[] decompress(final byte[] src) throws IOException { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src); GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream); return IOUtils.toByteArray(gzipInputStream); } private void checkpoint(IRecordProcessorCheckpointer checkpointer) { for (int i = 0; i < PROCESSING_RETRIES_MAX; i++) { try { checkpointer.checkpoint(); break; } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). System.out.println("Caught shutdown exception, skipping checkpoint." + se); break; } catch (ThrottlingException e) { // Backoff and re-attempt checkpoint upon transient failures if (i >= (PROCESSING_RETRIES_MAX - 1)) { System.out.println("Checkpoint failed after " + (i + 1) + "attempts." + e); break; } else { System.out.println("Transient issue when checkpointing - attempt " + (i + 1) + " of " + PROCESSING_RETRIES_MAX + e); } } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). System.out.println("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library." + e); break; } try { Thread.sleep(BACKOFF_TIME_IN_MILLIS); } catch (InterruptedException e) { System.out.println("Interrupted sleep" + e); } } } } private static byte[] decrypt(final byte[] decoded, final byte[] decodedDataKey) throws IOException { // Create a JCE master key provider using the random key and an AES-GCM encryption algorithm final JceMasterKey masterKey = JceMasterKey.getInstance(new SecretKeySpec(decodedDataKey, "AES"), "BC", "DataKey", "AES/GCM/NoPadding"); try (final CryptoInputStream<JceMasterKey> decryptingStream = CRYPTO.createDecryptingStream(masterKey, new ByteArrayInputStream(decoded)); final ByteArrayOutputStream out = new ByteArrayOutputStream()) { IOUtils.copy(decryptingStream, out); return out.toByteArray(); } } public static void main(String[] args) throws Exception { final String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); final KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(APPLICATION_NAME, STREAM_NAME, CREDENTIALS_PROVIDER, workerId); kinesisClientLibConfiguration.withInitialPositionInStream(InitialPositionInStream.LATEST); kinesisClientLibConfiguration.withRegionName(REGION_NAME); final Worker worker = new Builder() .recordProcessorFactory(new RecordProcessorFactory()) .config(kinesisClientLibConfiguration) .build(); System.out.printf("Running %s to process stream %s as worker %s...\n", APPLICATION_NAME, STREAM_NAME, workerId); try { worker.run(); } catch (Throwable t) { System.err.println("Caught throwable while processing data."); t.printStackTrace(); System.exit(1); } System.exit(0); } private static byte[] getByteArray(final ByteBuffer b) { byte[] byteArray = new byte[b.remaining()]; b.get(byteArray); return byteArray; } }
Python
import base64 import json import zlib import aws_encryption_sdk from aws_encryption_sdk import CommitmentPolicy from aws_encryption_sdk.internal.crypto import WrappingKey from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType import boto3 REGION_NAME = '<region>' # us-east-1 RESOURCE_ID = '<external-resource-id>' # cluster-ABCD123456 STREAM_NAME = 'aws-rds-das-' + RESOURCE_ID # aws-rds-das-cluster-ABCD123456 enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT) class MyRawMasterKeyProvider(RawMasterKeyProvider): provider_id = "BC" def __new__(cls, *args, **kwargs): obj = super(RawMasterKeyProvider, cls).__new__(cls) return obj def __init__(self, plain_key): RawMasterKeyProvider.__init__(self) self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING, wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC) def _get_raw_key(self, key_id): return self.wrapping_key def decrypt_payload(payload, data_key): my_key_provider = MyRawMasterKeyProvider(data_key) my_key_provider.add_master_key("DataKey") decrypted_plaintext, header = enc_client.decrypt( source=payload, materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider)) return decrypted_plaintext def decrypt_decompress(payload, key): decrypted = decrypt_payload(payload, key) return zlib.decompress(decrypted, zlib.MAX_WBITS + 16) def main(): session = boto3.session.Session() kms = session.client('kms', region_name=REGION_NAME) kinesis = session.client('kinesis', region_name=REGION_NAME) response = kinesis.describe_stream(StreamName=STREAM_NAME) shard_iters = [] for shard in response['StreamDescription']['Shards']: shard_iter_response = kinesis.get_shard_iterator(StreamName=STREAM_NAME, ShardId=shard['ShardId'], ShardIteratorType='LATEST') shard_iters.append(shard_iter_response['ShardIterator']) while len(shard_iters) > 0: next_shard_iters = [] for shard_iter in shard_iters: response = kinesis.get_records(ShardIterator=shard_iter, Limit=10000) for record in response['Records']: record_data = record['Data'] record_data = json.loads(record_data) payload_decoded = base64.b64decode(record_data['databaseActivityEvents']) data_key_decoded = base64.b64decode(record_data['key']) data_key_decrypt_result = kms.decrypt(CiphertextBlob=data_key_decoded, EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID}) print (decrypt_decompress(payload_decoded, data_key_decrypt_result['Plaintext'])) if 'NextShardIterator' in response: next_shard_iters.append(response['NextShardIterator']) shard_iters = next_shard_iters if __name__ == '__main__': main()