Surveillance des flux d'activité de base de données - Amazon Aurora

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Surveillance des flux d'activité de base de données

Les flux d'activité de base de données surveillent et rapportent les activités. Le flux d'activité est collecté et transmis à Amazon Kinesis. Depuis Kinesis, vous pouvez surveiller le flux d'activité ou d'autres services et applications peuvent utiliser le flux d'activité pour une analyse plus approfondie. Vous pouvez trouver le nom du flux Kinesis sous-jacent à l'aide de la AWS CLI commande describe-db-clustersou de l'opération de l'API RDS. DescribeDBClusters

Aurora gère le flux Kinesis pour vous comme suit :

  • Aurora crée automatiquement le flux Kinesis avec une période de rétention de 24 heures.

  • Aurora met à l'échelle le flux Kinesis si nécessaire.

  • Si vous arrêtez le flux d'activité de base de données ou supprimez le cluster de base de données, Aurora supprime le flux Kinesis.

Les catégories d'activité suivantes sont surveillées et incluses dans le journal d'audit de flux d'activité :

  • Commandes SQL – Toutes les commandes SQL sont auditées, ainsi que les instructions préparées, les fonctions intégrées et les fonctions en PL/SQL. Les appels aux procédures stockées sont vérifiés. Toutes les instructions SQL émises dans des procédures ou fonctions stockées sont également vérifiées.

  • Autres informations de bases de données – L'activité surveillée inclut l'instruction SQL complète, le nombre des lignes affectées par les commandes DML, les objets consultés et le nom unique de base de données. Pour Aurora PostgreSQL, les flux d'activité de base de données surveillent également les variables de liaison et les paramètres de procédure stockée.

    Important

    Le texte SQL complet de chaque instruction est visible dans le journal d'audit du flux d'activité, y compris les données sensibles. Cependant, les mots de passe des utilisateurs de base de données sont expurgés si Aurora peut les déterminer d'après le contexte, comme dans l'instruction SQL suivante.

    ALTER ROLE role-name WITH password
  • Informations de connexion – L'activité surveillée inclut les informations de session et de réseau, l'ID de processus serveur et les codes de sortie.

Si un flux d'activité rencontre un échec pendant la surveillance de votre instance de base de données, vous en êtes informé via des événements RDS.

Accès à un flux d'activité depuis Kinesis

Lorsque vous activez un flux d'activité pour un cluster de base de données, un flux Kinesis est créé pour vous. Depuis Kinesis, vous pouvez surveiller l'activité de votre base de données en temps réel. Pour effectuer des analyses plus poussées de l'activité de base de données, vous pouvez connecter votre flux Kinesis à des applications grand public. Vous pouvez également connecter le flux à des applications de gestion de la conformité telles que Security Guardium d'IBM ou l'audit et la protection des SecureSphere bases de données d'Imperva, le Security Guardium d'IBM ou l'audit et la protection .

Vous pouvez accéder à votre flux Kinesis à partir de la console RDS ou de la console Kinesis.

Pour accéder à un flux d'activité depuis Kinesis avec la console RDS
  1. Ouvrez la console Amazon RDS à l’adresse https://console.aws.amazon.com/rds/.

  2. Dans le panneau de navigation, choisissez Databases (Bases de données).

  3. Choisissez le cluster de base de données où vous souhaitez démarrer un flux d'activité.

  4. Choisissez Configuration.

  5. Sous Database activity stream (Flux d'activité de la base de données), choisissez le lien sous Kinesis stream (Flux Kinesis).

  6. Dans la console Kinesis, choisissez Monitoring (Surveillance) pour commencer à observer l'activité de la base de données.

Pour accéder à un flux d'activité depuis Kinesis avec la console Kinesis
  1. Ouvrez la console Kinesis à l'adresse https://console.aws.amazon.com/kinesis.

  2. Choisissez votre flux d'activité dans la liste des flux Kinesis.

    Le nom d'un flux d'activité comprend le préfixe aws-rds-das-cluster- suivi de l'ID de ressource du cluster de base de données. Voici un exemple de.

    aws-rds-das-cluster-NHVOV4PCLWHGF52NP

    Pour utiliser la console Amazon RDS afin de trouver l'ID de ressource pour le cluster de base de données, choisissez votre cluster de base de données dans la liste des bases de données, puis choisissez l'onglet Configuration.

    AWS CLI Pour rechercher le nom complet du flux Kinesis d'un flux d'activité, utilisez une requête describe-db-clustersCLI et notez la valeur de ActivityStreamKinesisStreamName dans la réponse.

  3. Choisissez Surveillance pour commencer à observer l'activité de base de données.

Pour plus d'informations sur l'utilisation d'Amazon Kinesis, consultez la section En quoi consiste le service Amazon Kinesis Data Streams ?.

Contenus et exemples de journaux d'audit

Les événements surveillés sont représentés dans le flux d'activité de base de données sous la forme de chaînes JSON. La structure se compose d'un objet JSON contenant un DatabaseActivityMonitoringRecord, qui contient lui-même un tableau des événements d'activité databaseActivityEventList.

Exemples de journaux d'audit de flux d'activité

Vous trouverez ci-après des exemples de journaux d'audits JSON déchiffrés d'enregistrements d'événements d'activité.

Exemple Enregistrement d'événement d'activité d'une instruction Aurora PostgreSQL CONNECT SQL

L'enregistrement d'événement d'activité suivant indique une connexion à l'aide d'une instruction SQL CONNECT (command) par un client psql (clientApplication).

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-10-30 00:39:49.940668+00", "logTime": "2019-10-30 00:39:49.990579+00", "statementId": 1, "substatementId": 1, "objectType": null, "command": "CONNECT", "objectName": null, "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "49804", "sessionId": "5ce5f7f0.474b", "rowCount": null, "commandText": null, "paramList": [], "pid": 18251, "clientApplication": "psql", "exitCode": null, "class": "MISC", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }
Exemple Enregistrement d'événement d'activité d'une instruction Aurora MySQL CONNECT SQL

L'enregistrement d'événement d'activité suivant indique une connexion à l'aide d'une instruction SQL CONNECT (command) par un client mysql (clientApplication).

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:13.267214+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"rdsadmin", "databaseName":"", "remoteHost":"localhost", "remotePort":"11053", "command":"CONNECT", "commandText":"", "paramList":null, "objectType":"TABLE", "objectName":"", "statementId":0, "substatementId":1, "exitCode":"0", "sessionId":"725121", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:13.267207+00", "endTime":"2020-05-22 18:07:13.267213+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }
Exemple Registre d'événement d'activité d'une instruction Aurora PostgreSQL CREATE TABLE

L'exemple suivant montre un événement CREATE TABLE pour Aurora PostgreSQL.

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-05-24 00:36:54.403455+00", "logTime": "2019-05-24 00:36:54.494235+00", "statementId": 2, "substatementId": 1, "objectType": null, "command": "CREATE TABLE", "objectName": null, "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "34534", "sessionId": "5ce73c6f.7e64", "rowCount": null, "commandText": "create table my_table (id serial primary key, name varchar(32));", "paramList": [], "pid": 32356, "clientApplication": "psql", "exitCode": null, "class": "DDL", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }
Exemple Enregistrement d'événement d'activité d'une instruction CREATE TABLE Aurora MySQL

L'exemple suivant montre une instruction CREATE TABLE pour Aurora MySQL. L'opération est représentée sous la forme de deux enregistrements d'événements distincts. Un événement a "class":"MAIN". L'autre événement a "class":"AUX". Les messages peuvent arriver dans n'importe quel ordre. Le champ logTime de l'événement MAIN est toujours antérieur au champ logTime des événements AUX correspondants.

L'exemple suivant montre l'événement avec une valeur class de MAIN.

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:12.250221+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"QUERY", "commandText":"CREATE TABLE test1 (id INT)", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65459278, "substatementId":1, "exitCode":"0", "sessionId":"725118", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:12.226384+00", "endTime":"2020-05-22 18:07:12.250222+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }

L'exemple suivant montre l'événement correspondant avec une valeur class de AUX.

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:12.247182+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"CREATE", "commandText":"test1", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65459278, "substatementId":2, "exitCode":"", "sessionId":"725118", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:12.226384+00", "endTime":"2020-05-22 18:07:12.247182+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"AUX" } ] }
Exemple Registre d'événement d'activité d'une instruction Aurora PostgreSQL SELECT

L'exemple suivant montre un événement SELECT .

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-05-24 00:39:49.920564+00", "logTime": "2019-05-24 00:39:49.940668+00", "statementId": 6, "substatementId": 1, "objectType": "TABLE", "command": "SELECT", "objectName": "public.my_table", "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "34534", "sessionId": "5ce73c6f.7e64", "rowCount": 10, "commandText": "select * from my_table;", "paramList": [], "pid": 32356, "clientApplication": "psql", "exitCode": null, "class": "READ", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }
{ "type": "DatabaseActivityMonitoringRecord", "clusterId": "", "instanceId": "db-4JCWQLUZVFYP7DIWP6JVQ77O3Q", "databaseActivityEventList": [ { "class": "TABLE", "clientApplication": "Microsoft SQL Server Management Studio - Query", "command": "SELECT", "commandText": "select * from [testDB].[dbo].[TestTable]", "databaseName": "testDB", "dbProtocol": "SQLSERVER", "dbUserName": "test", "endTime": null, "errorMessage": null, "exitCode": 1, "logTime": "2022-10-06 21:24:59.9422268+00", "netProtocol": null, "objectName": "TestTable", "objectType": "TABLE", "paramList": null, "pid": null, "remoteHost": "local machine", "remotePort": null, "rowCount": 0, "serverHost": "172.31.30.159", "serverType": "SQLSERVER", "serverVersion": "15.00.4073.23.v1.R1", "serviceName": "sqlserver-ee", "sessionId": 62, "startTime": null, "statementId": "0x03baed90412f564fad640ebe51f89b99", "substatementId": 1, "transactionId": "4532935", "type": "record", "engineNativeAuditFields": { "target_database_principal_id": 0, "target_server_principal_id": 0, "target_database_principal_name": "", "server_principal_id": 2, "user_defined_information": "", "response_rows": 0, "database_principal_name": "dbo", "target_server_principal_name": "", "schema_name": "dbo", "is_column_permission": true, "object_id": 581577110, "server_instance_name": "EC2AMAZ-NFUJJNO", "target_server_principal_sid": null, "additional_information": "", "duration_milliseconds": 0, "permission_bitmask": "0x00000000000000000000000000000001", "data_sensitivity_information": "", "session_server_principal_name": "test", "connection_id": "AD3A5084-FB83-45C1-8334-E923459A8109", "audit_schema_version": 1, "database_principal_id": 1, "server_principal_sid": "0x010500000000000515000000bdc2795e2d0717901ba6998cf4010000", "user_defined_event_id": 0, "host_name": "EC2AMAZ-NFUJJNO" } } ] }
Exemple Enregistrement d'événement d'activité d'une instruction SELECT Aurora MySQL

L'exemple suivant montre un événement SELECT.

L'exemple suivant montre l'événement avec une valeur class de MAIN.

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:29:57.986467+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"QUERY", "commandText":"SELECT * FROM test1 WHERE id < 28", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65469218, "substatementId":1, "exitCode":"0", "sessionId":"726571", "rowCount":2, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:29:57.986364+00", "endTime":"2020-05-22 18:29:57.986467+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }

L'exemple suivant montre l'événement correspondant avec une valeur class de AUX.

{ "type":"DatabaseActivityMonitoringRecord", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:29:57.986399+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"READ", "commandText":"test1", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65469218, "substatementId":2, "exitCode":"", "sessionId":"726571", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:29:57.986364+00", "endTime":"2020-05-22 18:29:57.986399+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"AUX" } ] }

DatabaseActivityMonitoringRecordsObjet JSON

Les enregistrements d'événement d'activité de base de données se trouvent dans un objet JSON qui contient les informations suivantes.

Champ JSON Type de données Description

type

chaîne

Type de l'enregistrement JSON. La valeur est DatabaseActivityMonitoringRecords.

version chaîne Version des enregistrements de surveillance d'activité de base de données.

La version des enregistrements d'activité de base de données générés dépend de la version du moteur du cluster de base de données.

  • Les enregistrements d'activité de base de données version 1.1 sont générés pour les clusters de base de données Aurora PostgreSQL exécutant un moteur version 10.10 et versions mineures ultérieures ou un moteur versions 11.5 et ultérieures.

  • Les enregistrements d'activité de base de données version 1.0 sont générés pour des clusters de base de données Aurora PostgreSQL exécutant un moteur version 10.7 ou 11.4.

Tous les champs suivants sont à la fois dans la version 1.0 et dans la version 1.1, sauf indication spécifique.

databaseActivityEvents

chaîne

Objet JSON qui contient les événements d'activité.

key chaîne Clé de chiffrement que vous utilisez pour déchiffrer databaseActivityEventListe

databaseActivityEvents Objet JSON

L'objet JSON databaseActivityEvents contient les informations suivantes.

Champs de niveau supérieur dans l'enregistrement JSON

Chaque événement du journal d'audit est encapsulé dans un enregistrement au format JSON. Cet enregistrement contient les champs suivants.

type

Ce champ a toujours la valeur DatabaseActivityMonitoringRecords.

version ;

Ce champ représente la version du contrat ou du protocole de données de flux d'activité de base de données. Il définit les champs disponibles.

La version 1.0 représente la prise en charge des flux d'activité de données d'origine pour Aurora PostgreSQL versions 10.7 et 11.4. La version 1.1 représente la prise en charge des flux d'activité de données pour Aurora PostgreSQL versions 10.10 et supérieures et Aurora PostgreSQL version 11.5 et supérieures. La version 1.1 inclut les champs supplémentaires errorMessage et startTime. La version 1.2 représente la prise en charge des flux d'activité de données pour Aurora MySQL version 2.08 et supérieures. La version 1.2 inclut les champs supplémentaires endTime et transactionId.

databaseActivityEvents

Chaîne chiffrée représentant un ou plusieurs événements d'activité. Elle est représentée sou la forme d'un tableau base64 octets. Lorsque vous déchiffrez la chaîne, le résultat est un enregistrement au format JSON avec des champs comme ceux des exemples de cette section.

key

Clé de données chiffrée utilisée pour chiffrer la chaîne databaseActivityEvents. Il s'agit du même AWS KMS key que celui que vous avez fourni lorsque vous avez démarré le flux d'activité de la base de données.

L'exemple suivant illustre le format de cet enregistrement.

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents":"encrypted audit records", "key":"encrypted key" }

Pour déchiffrer le contenu du champ databaseActivityEvents, procédez comme suit :

  1. Déchiffrez la valeur dans le champ JSON key à l'aide de la clé KMS que vous avez fournie lors du démarrage du flux d'activité de base de données. Cette opération renvoie la clé de chiffrement des données en texte clair.

  2. Décodez en base64 la valeur dans le champ JSON databaseActivityEvents pour obtenir le texte chiffré, au format binaire, de la charge utile d'audit.

  3. Déchiffrez le chiffrement binaire avec la clé de chiffrement de données que vous avez décodée au cours de la première étape.

  4. Décompressez la charge utile déchiffrée.

    • La charge utile chiffrée se trouve dans le champ databaseActivityEvents.

    • Le champ databaseActivityEventList contient un tableau d'enregistrements d'audits. Les champs type du tableau peuvent être record ou heartbeat.

L'enregistrement d'événement d'activité du journal d'audit est un objet JSON qui contient les informations suivantes.

Champ JSON Type de données Description

type

chaîne

Type de l'enregistrement JSON. La valeur est DatabaseActivityMonitoringRecord.

clusterId chaîne Identificateur de ressource de cluster de base de données. Il correspond à l'attribut de cluster de base de donnéesDbClusterResourceId.
instanceId chaîne Identificateur de ressource d'instance de base de données. Il correspond à l'attribut d'instance de base de données DbiResourceId.

databaseActivityEventListe

chaîne

Tableau d'enregistrements d'audits d'activité ou de messages de pulsations.

databaseActivityEventTableau JSON de liste

La charge utile du journal d'audit est un tableau JSON databaseActivityEventList chiffré. Ci-dessous, les tableaux répertorient par ordre alphabétique les champs de chaque événement d'activité dans le tableau DatabaseActivityEventList déchiffré d'un journal d'audit. Les champs diffèrent selon que vous utilisez Aurora PostgreSQL ou Aurora MySQL. Consultez la table qui s'applique à votre moteur de base de données.

Important

Il se peut que la structure d'événement change. Il se peut qu'Aurora ajoute de nouveaux champs aux événements d'activité à l'avenir. Dans les applications qui analysent les données JSON, assurez-vous que votre code peut ignorer ou prendre les mesures appropriées pour les noms de champs inconnus.

databaseActivityEventListe des champs pour Aurora PostgreSQL
Champ Type de données Description
class chaîne

La classe d'un événement d'activité. Les valeurs possibles pour Aurora PostgreSQL sont les suivantes :

  • ALL

  • CONNECT – Événement de connexion ou déconnexion.

  • DDL – Instruction DDL non incluse dans la liste des instructions pour la classe ROLE.

  • FUNCTION – Appel de fonction ou un bloc DO.

  • MISC – Commande diverse telle que DISCARD, FETCH, CHECKPOINT ou VACUUM.

  • NONE

  • READ – Instruction SELECT ou COPY lorsque la source est une relation ou une requête.

  • ROLE – Instruction liée aux rôles et aux privilèges, incluant GRANT, REVOKE et CREATE/ALTER/DROP ROLE.

  • WRITE – Instruction INSERT, UPDATE, DELETE, TRUNCATE ou COPY lorsque la destination est une relation.

clientApplication chaîne Application utilisée par le client pour se connecter, telle que signalée par le client. Le client n'a pas à fournir cette information, la valeur peut être « null ».
command chaîne Nom de la commande SQL sans aucun détail sur la commande
commandText chaîne

Instruction SQL réelle transmise par l'utilisateur. Pour Aurora PostgreSQL, la valeur est identique à l'instruction SQL d'origine. Ce champ est utilisé pour tous les types d'enregistrements, excepté pour les enregistrements de connexion ou de déconnexion, auxquels cas la valeur est « null ».

Important

Le texte SQL complet de chaque instruction est visible dans le journal d'audit du flux d'activité, y compris les données sensibles. Cependant, les mots de passe des utilisateurs de la base de données sont masqués si Aurora peut les deviner suivant le contexte, comme le montre l'exemple suivant.

ALTER ROLE role-name WITH password
databaseName chaîne Base de données à laquelle l'utilisateur s'est connecté.
dbProtocol chaîne Le protocole de base de données, par exemple Postgres 3.0.
dbUserName chaîne L'utilisateur de la base de données avec lequel le client s'est authentifié.
errorMessage

(enregistrements d'activité de base de données version 1.1 uniquement)

chaîne

En cas d'erreur, ce champ contient le message d'erreur qui aurait été généré par le serveur de base de données. La valeur errorMessage est nulle pour les instructions normales qui n'ont pas donné lieu à une erreur.

Une erreur est définie comme étant une activité quelconque qui produirait un événement de journal d'erreurs PostgreSQL visible par le client avec un niveau de gravité égal ou supérieur à ERROR. Pour plus d'informations, veuillez consulter la documentation relative aux niveaux de gravité des messages PostgreSQL. Par exemple, les erreurs de syntaxe et les annulations de requête génèrent un message d'erreur.

Les erreurs internes du serveur PostgreSQL, telles que les erreurs de processus du pointeur de contrôle en arrière-plan, ne génèrent pas de message d'erreur. Cependant, des enregistrements sont toujours émis pour des événements de ce type, quel que soit le paramètre du niveau de gravité du journal. Cela empêche les pirates informatiques de désactiver la journalisation pour tenter d'éviter la détection.

Voir aussi le champ exitCode.

exitCode int Valeur utilisée pour l'enregistrement en sortie de session. En cas de sortie sans problème, elle contient le code de sortie. Un code de sortie ne peut pas toujours être obtenu dans certains scénarios d'échec. Par exemple, si PostgreSQL effectue un exit() ou si un opérateur exécute une commande telle que kill -9.

Si une erreur s'est produite, le champ exitCode affiche le code d'erreur SQL SQLSTATE, comme indiqué dans les codes d'erreur PostgreSQL.

Voir aussi le champ errorMessage.

logTime chaîne Horodatage, tel qu'il est enregistré dans l'audit du chemin du code. Cela représente l'heure de fin d'exécution de l'instruction SQL. Voir aussi le champ startTime.
netProtocol chaîne Protocole de communication réseau.
objectName chaîne Nom de l'objet de la base de données si l'instruction SQL agit sur l'un d'eux. Ce champ n'est utilisé que lorsque l'instruction SQL agit sur un objet de base de données. Si l'instruction SQL n'agit pas sur un objet, la valeur est « null ».
objectType chaîne Type de l'objet de base de données, par exemple, table, index, vue, etc. Ce champ n'est utilisé que lorsque l'instruction SQL agit sur un objet de base de données. Si l'instruction SQL n'agit pas sur un objet, la valeur est « null ». Les valeurs valides sont notamment les suivantes :
  • COMPOSITE TYPE

  • FOREIGN TABLE

  • FUNCTION

  • INDEX

  • MATERIALIZED VIEW

  • SEQUENCE

  • TABLE

  • TOAST TABLE

  • VIEW

  • UNKNOWN

paramList chaîne Tableau de paramètres séparés par des virgules, transmis à l'instruction SQL. Si l'instruction SQL n'a pas de paramètres, la valeur est un tableau vide.
pid int ID du processus de backend qui est dédié au service de la connexion du client.
remoteHost chaîne L'adresse IP du client ou le nom d'hôte. Pour Aurora PostgreSQL, le paramètre log_hostname de la base de données détermine lequel est utilisé.
remotePort chaîne Numéro de port du client.
rowCount int Nombre de lignes renvoyées par l'instruction SQL. Par exemple, si une instruction SELECT renvoie 10 lignes, rowCount est égal à 10. Pour les instructions INSERT ou UPDATE, rowCount est égal 0.
serverHost chaîne Adresse IP de l'hôte du serveur de base de données.
serverType chaîne Type du serveur de base de données, par exemple PostgreSQL.
serverVersion chaîne Version du serveur de base de données, par exemple 2.3.1 pour Aurora PostgreSQL.
serviceName chaîne Nom du service, par exemple Amazon Aurora PostgreSQL-Compatible edition.
sessionId int Identifiant de session à pseudo unique.
sessionId int Identifiant de session à pseudo unique.
startTime

(enregistrements d'activité de base de données version 1.1 uniquement)

chaîne

Heure à laquelle l'exécution a commencé pour l'instruction SQL.

Pour calculer le temps d'exécution approximatif de l'instruction SQL, utilisez logTime - startTime. Voir aussi le champ logTime.

statementId int Identificateur de l'instruction SQL du client. Le compteur se trouve au niveau de la session et s'incrémente avec chaque instruction SQL entrée par le client.
substatementId int Identificateur d'une sous-instruction SQL. Cette valeur compte le nombre de sous-instructions pour chaque instruction identifiée par le champ statementId.
type chaîne Type d'événement. Les valeurs valides sont record ou heartbeat.
databaseActivityEventListe des champs pour Aurora MySQL
Champ Type de données Description
class chaîne

La classe d'un événement d'activité.

Les valeurs possibles pour Aurora MySQL sont les suivantes :

  • MAIN – Événement principal représentant une instruction SQL.

  • AUX – Événement supplémentaire contenant des détails supplémentaires. Par exemple, une instruction qui renomme un objet peut avoir un événement d'une classe AUX qui reflète le nouveau nom.

    Pour rechercher les événements MAIN et AUX correspondant à la même instruction, vérifiez les événements différents qui ont les mêmes valeurs pour le champ pid et pour le champ statementId.

clientApplication chaîne Application utilisée par le client pour se connecter, telle que signalée par le client. Le client n'a pas à fournir cette information, la valeur peut être « null ».
command chaîne

Catégorie générale de l'instruction SQL. Les valeurs de ce champ dépendent de la valeur de class.

Lorsque class est MAIN les valeurs sont notamment les suivantes :

  • CONNECT – Lorsqu'une session client est connectée.

  • QUERY – Instruction SQL. Accompagnée d'un ou plusieurs événements dont la valeur class est AUX.

  • DISCONNECT – Lorsqu' une session client est déconnectée.

  • FAILED_CONNECT – Lorsqu'un client tente de se connecter mais n'y parvient pas.

  • CHANGEUSER – Changement d'état qui fait partie du protocole réseau MySQL, et non d'une instruction que vous émettez.

Lorsque class est AUX les valeurs sont notamment les suivantes :

  • READ – Instruction SELECT ou COPY lorsque la source est une relation ou une requête.

  • WRITE – Instruction INSERT, UPDATE, DELETE, TRUNCATE ou COPY lorsque la destination est une relation.

  • DROP – Suppression d'un objet.

  • CREATE – Création d'un objet.

  • RENAME – Renommage d'un objet.

  • ALTER – Pour changer les propriétés d'un objet.

commandText chaîne

Pour les événements dont la valeur class est MAIN, ce champ représente l'instruction SQL réelle transmise par l'utilisateur. Ce champ est utilisé pour tous les types d'enregistrements, excepté pour les enregistrements de connexion ou de déconnexion, auxquels cas la valeur est « null ».

Pour les événements dont la valeur class est AUX, ce champ contient des informations supplémentaires sur les objets impliqués dans l'événement.

Pour Aurora MySQL, les caractères tels que les guillemets sont précédés d'une barre oblique inverse, représentant un caractère d'échappement.

Important

Le texte SQL complet de chaque instruction est visible dans le journal d'audit, y compris les données sensibles. Cependant, les mots de passe des utilisateurs de la base de données sont masqués si Aurora peut les deviner suivant le contexte, comme le montre l'exemple suivant.

mysql> SET PASSWORD = 'my-password';
Note

Spécifiez un mot de passe autre que celui indiqué ici, en tant que bonne pratique de sécurité.

databaseName chaîne Base de données à laquelle l'utilisateur s'est connecté.
dbProtocol chaîne Protocole de la base de données. Actuellement, cette valeur est toujours MySQL pour Aurora MySQL.
dbUserName chaîne L'utilisateur de la base de données avec lequel le client s'est authentifié.
endTime

(enregistrements d'activité de base de données version 1.2 uniquement)

chaîne

Heure à laquelle l'exécution a fini pour l'instruction SQL. Il est représenté au format UTC (temps universel coordonné).

Pour calculer le temps d'exécution de l'instruction SQL, utilisez endTime - startTime. Voir aussi le champ startTime.

errorMessage

(enregistrements d'activité de base de données version 1.1 uniquement)

chaîne

En cas d'erreur, ce champ contient le message d'erreur qui aurait été généré par le serveur de base de données. La valeur errorMessage est nulle pour les instructions normales qui n'ont pas donné lieu à une erreur.

Une erreur est définie comme étant une activité quelconque qui produirait un événement de journal d'erreurs MySQL visible par le client avec un niveau de gravité égal ou supérieur à ERROR. Pour plus d'informations, veuillez consulter Le journal d'erreurs dans le Manuel de référence MySQL. Par exemple, les erreurs de syntaxe et les annulations de requête génèrent un message d'erreur.

Les erreurs internes du serveur MySQL, telles que les erreurs de processus du pointeur de contrôle en arrière-plan, ne génèrent pas de message d'erreur. Cependant, des enregistrements sont toujours émis pour des événements de ce type, quel que soit le paramètre du niveau de gravité du journal. Cela empêche les pirates informatiques de désactiver la journalisation pour tenter d'éviter la détection.

Voir aussi le champ exitCode.

exitCode int Valeur utilisée pour l'enregistrement en sortie de session. En cas de sortie sans problème, elle contient le code de sortie. Un code de sortie ne peut pas toujours être obtenu dans certains scénarios d'échec. Dans de tels cas, cette valeur peut être nulle ou vide.
logTime chaîne Horodatage, tel qu'il est enregistré dans l'audit du chemin du code. Il est représenté au format UTC (temps universel coordonné). Pour connaître la méthode la plus précise de calculer la durée de l'instruction, veuillez consulter les champs startTime et endTime.
netProtocol chaîne Protocole de communication réseau. Actuellement, cette valeur est toujours TCP pour Aurora MySQL.
objectName chaîne Nom de l'objet de la base de données si l'instruction SQL agit sur l'un d'eux. Ce champ n'est utilisé que lorsque l'instruction SQL agit sur un objet de base de données. Si l'instruction SQL n'agit pas sur un objet, cette valeur est vide. Pour construire le nom complet de l'objet, combinez databaseName et objectName. Si la requête comprend plusieurs objets, ce champ peut être une liste de noms séparés par des virgules.
objectType chaîne

Type de l'objet de base de données, par exemple, table, index, etc. Ce champ n'est utilisé que lorsque l'instruction SQL agit sur un objet de base de données. Si l'instruction SQL n'agit pas sur un objet, la valeur est « null ».

Les valeurs valides pour Aurora MySQL sont notamment les suivantes :

  • INDEX

  • TABLE

  • UNKNOWN

paramList chaîne Ce champ n'est pas utilisé pour Aurora MySQL et est toujours « null ».
pid int ID du processus de backend qui est dédié au service de la connexion du client. Lorsque le serveur de base de données est redémarré, les modifications pid et le compteur du champ statementId redémarrent.
remoteHost chaîne L'adresse IP ou le nom d'hôte du client qui a émis l'instruction SQL. Pour Aurora MySQL, le paramètre skip_name_resolve de la base de données détermine lequel est utilisé. La valeur localhost indique l'activité de l'utilisateur spécial rdsadmin.
remotePort chaîne Numéro de port du client.
rowCount int Numéro des lignes de la table affectées ou récupérées par l'instruction SQL. Ce champ n'est utilisé que pour les instructions SQL qui sont des instructions en langage de manipulation de données (DML). Si l'instruction SQL n'est pas une instruction DML, la valeur est « null ».
serverHost chaîne Identificateur d'instance du serveur de base de données. Cette valeur est représentée différemment pour Aurora MySQL et Aurora PostgreSQL. Aurora PostgreSQL utilise une adresse IP au lieu d'un identificateur.
serverType chaîne Type du serveur de base de données, par exemple MySQL.
serverVersion chaîne Version du serveur de base de données. Actuellement, cette valeur est toujours MySQL 5.7.12 pour Aurora MySQL.
serviceName chaîne Nom du service. Actuellement, cette valeur est toujours Amazon Aurora MySQL pour Aurora MySQL.
sessionId int Identifiant de session à pseudo unique.
startTime

(enregistrements d'activité de base de données version 1.1 uniquement)

chaîne

Heure à laquelle l'exécution a commencé pour l'instruction SQL. Il est représenté au format UTC (temps universel coordonné).

Pour calculer le temps d'exécution de l'instruction SQL, utilisez endTime - startTime. Voir aussi le champ endTime.

statementId int Identificateur de l'instruction SQL du client. Le compteur s'incrémente avec chaque instruction SQL saisie par le client. Le compteur est réinitialisé lorsque l'instance de base de données est redémarrée.
substatementId int Identificateur d'une sous-instruction SQL. Cette valeur est 1 pour les événements de classe MAIN et 2 pour les événements de classe AUX. Utilisez le champ statementId pour identifier tous les événements générés par la même instruction.
transactionId

(enregistrements d'activité de base de données version 1.2 uniquement)

int Identificateur d'une transaction.
type chaîne Type d'événement. Les valeurs valides sont record ou heartbeat.

Traitement d'un flux d'activité de base de données à l'aide du AWS SDK

Vous pouvez traiter un flux d'activité par programmation à l'aide du AWS SDK. Les exemples suivants sont des exemples Java et Python entièrement fonctionnels de traitement du flux de données Kinesis.

Java
import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.security.NoSuchAlgorithmException; import java.security.NoSuchProviderException; import java.security.Security; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.zip.GZIPInputStream; import javax.crypto.Cipher; import javax.crypto.NoSuchPaddingException; import javax.crypto.spec.SecretKeySpec; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.encryptionsdk.AwsCrypto; import com.amazonaws.encryptionsdk.CryptoInputStream; import com.amazonaws.encryptionsdk.jce.JceMasterKey; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kms.AWSKMS; import com.amazonaws.services.kms.AWSKMSClientBuilder; import com.amazonaws.services.kms.model.DecryptRequest; import com.amazonaws.services.kms.model.DecryptResult; import com.amazonaws.util.Base64; import com.amazonaws.util.IOUtils; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; import org.bouncycastle.jce.provider.BouncyCastleProvider; public class DemoConsumer { private static final String STREAM_NAME = "aws-rds-das-[cluster-external-resource-id]"; private static final String APPLICATION_NAME = "AnyApplication"; //unique application name for dynamo table generation that holds kinesis shard tracking private static final String AWS_ACCESS_KEY = "[AWS_ACCESS_KEY_TO_ACCESS_KINESIS]"; private static final String AWS_SECRET_KEY = "[AWS_SECRET_KEY_TO_ACCESS_KINESIS]"; private static final String DBC_RESOURCE_ID = "[cluster-external-resource-id]"; private static final String REGION_NAME = "[region-name]"; //us-east-1, us-east-2... private static final BasicAWSCredentials CREDENTIALS = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY); private static final AWSStaticCredentialsProvider CREDENTIALS_PROVIDER = new AWSStaticCredentialsProvider(CREDENTIALS); private static final AwsCrypto CRYPTO = new AwsCrypto(); private static final AWSKMS KMS = AWSKMSClientBuilder.standard() .withRegion(REGION_NAME) .withCredentials(CREDENTIALS_PROVIDER).build(); class Activity { String type; String version; String databaseActivityEvents; String key; } class ActivityEvent { @SerializedName("class") String _class; String clientApplication; String command; String commandText; String databaseName; String dbProtocol; String dbUserName; String endTime; String errorMessage; String exitCode; String logTime; String netProtocol; String objectName; String objectType; List<String> paramList; String pid; String remoteHost; String remotePort; String rowCount; String serverHost; String serverType; String serverVersion; String serviceName; String sessionId; String startTime; String statementId; String substatementId; String transactionId; String type; } class ActivityRecords { String type; String clusterId; String instanceId; List<ActivityEvent> databaseActivityEventList; } static class RecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new RecordProcessor(); } } static class RecordProcessor implements IRecordProcessor { private static final long BACKOFF_TIME_IN_MILLIS = 3000L; private static final int PROCESSING_RETRIES_MAX = 10; private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L; private static final Gson GSON = new GsonBuilder().serializeNulls().create(); private static final Cipher CIPHER; static { Security.insertProviderAt(new BouncyCastleProvider(), 1); try { CIPHER = Cipher.getInstance("AES/GCM/NoPadding", "BC"); } catch (NoSuchAlgorithmException | NoSuchPaddingException | NoSuchProviderException e) { throw new ExceptionInInitializerError(e); } } private long nextCheckpointTimeInMillis; @Override public void initialize(String shardId) { } @Override public void processRecords(final List<Record> records, final IRecordProcessorCheckpointer checkpointer) { for (final Record record : records) { processSingleBlob(record.getData()); } if (System.currentTimeMillis() > nextCheckpointTimeInMillis) { checkpoint(checkpointer); nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS; } } @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } } private void processSingleBlob(final ByteBuffer bytes) { try { // JSON $Activity final Activity activity = GSON.fromJson(new String(bytes.array(), StandardCharsets.UTF_8), Activity.class); // Base64.Decode final byte[] decoded = Base64.decode(activity.databaseActivityEvents); final byte[] decodedDataKey = Base64.decode(activity.key); Map<String, String> context = new HashMap<>(); context.put("aws:rds:dbc-id", DBC_RESOURCE_ID); // Decrypt final DecryptRequest decryptRequest = new DecryptRequest() .withCiphertextBlob(ByteBuffer.wrap(decodedDataKey)).withEncryptionContext(context); final DecryptResult decryptResult = KMS.decrypt(decryptRequest); final byte[] decrypted = decrypt(decoded, getByteArray(decryptResult.getPlaintext())); // GZip Decompress final byte[] decompressed = decompress(decrypted); // JSON $ActivityRecords final ActivityRecords activityRecords = GSON.fromJson(new String(decompressed, StandardCharsets.UTF_8), ActivityRecords.class); // Iterate throught $ActivityEvents for (final ActivityEvent event : activityRecords.databaseActivityEventList) { System.out.println(GSON.toJson(event)); } } catch (Exception e) { // Handle error. e.printStackTrace(); } } private static byte[] decompress(final byte[] src) throws IOException { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src); GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream); return IOUtils.toByteArray(gzipInputStream); } private void checkpoint(IRecordProcessorCheckpointer checkpointer) { for (int i = 0; i < PROCESSING_RETRIES_MAX; i++) { try { checkpointer.checkpoint(); break; } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). System.out.println("Caught shutdown exception, skipping checkpoint." + se); break; } catch (ThrottlingException e) { // Backoff and re-attempt checkpoint upon transient failures if (i >= (PROCESSING_RETRIES_MAX - 1)) { System.out.println("Checkpoint failed after " + (i + 1) + "attempts." + e); break; } else { System.out.println("Transient issue when checkpointing - attempt " + (i + 1) + " of " + PROCESSING_RETRIES_MAX + e); } } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). System.out.println("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library." + e); break; } try { Thread.sleep(BACKOFF_TIME_IN_MILLIS); } catch (InterruptedException e) { System.out.println("Interrupted sleep" + e); } } } } private static byte[] decrypt(final byte[] decoded, final byte[] decodedDataKey) throws IOException { // Create a JCE master key provider using the random key and an AES-GCM encryption algorithm final JceMasterKey masterKey = JceMasterKey.getInstance(new SecretKeySpec(decodedDataKey, "AES"), "BC", "DataKey", "AES/GCM/NoPadding"); try (final CryptoInputStream<JceMasterKey> decryptingStream = CRYPTO.createDecryptingStream(masterKey, new ByteArrayInputStream(decoded)); final ByteArrayOutputStream out = new ByteArrayOutputStream()) { IOUtils.copy(decryptingStream, out); return out.toByteArray(); } } public static void main(String[] args) throws Exception { final String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); final KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(APPLICATION_NAME, STREAM_NAME, CREDENTIALS_PROVIDER, workerId); kinesisClientLibConfiguration.withInitialPositionInStream(InitialPositionInStream.LATEST); kinesisClientLibConfiguration.withRegionName(REGION_NAME); final Worker worker = new Builder() .recordProcessorFactory(new RecordProcessorFactory()) .config(kinesisClientLibConfiguration) .build(); System.out.printf("Running %s to process stream %s as worker %s...\n", APPLICATION_NAME, STREAM_NAME, workerId); try { worker.run(); } catch (Throwable t) { System.err.println("Caught throwable while processing data."); t.printStackTrace(); System.exit(1); } System.exit(0); } private static byte[] getByteArray(final ByteBuffer b) { byte[] byteArray = new byte[b.remaining()]; b.get(byteArray); return byteArray; } }
Python
import base64 import json import zlib import aws_encryption_sdk from aws_encryption_sdk import CommitmentPolicy from aws_encryption_sdk.internal.crypto import WrappingKey from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType import boto3 REGION_NAME = '<region>' # us-east-1 RESOURCE_ID = '<external-resource-id>' # cluster-ABCD123456 STREAM_NAME = 'aws-rds-das-' + RESOURCE_ID # aws-rds-das-cluster-ABCD123456 enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT) class MyRawMasterKeyProvider(RawMasterKeyProvider): provider_id = "BC" def __new__(cls, *args, **kwargs): obj = super(RawMasterKeyProvider, cls).__new__(cls) return obj def __init__(self, plain_key): RawMasterKeyProvider.__init__(self) self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING, wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC) def _get_raw_key(self, key_id): return self.wrapping_key def decrypt_payload(payload, data_key): my_key_provider = MyRawMasterKeyProvider(data_key) my_key_provider.add_master_key("DataKey") decrypted_plaintext, header = enc_client.decrypt( source=payload, materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider)) return decrypted_plaintext def decrypt_decompress(payload, key): decrypted = decrypt_payload(payload, key) return zlib.decompress(decrypted, zlib.MAX_WBITS + 16) def main(): session = boto3.session.Session() kms = session.client('kms', region_name=REGION_NAME) kinesis = session.client('kinesis', region_name=REGION_NAME) response = kinesis.describe_stream(StreamName=STREAM_NAME) shard_iters = [] for shard in response['StreamDescription']['Shards']: shard_iter_response = kinesis.get_shard_iterator(StreamName=STREAM_NAME, ShardId=shard['ShardId'], ShardIteratorType='LATEST') shard_iters.append(shard_iter_response['ShardIterator']) while len(shard_iters) > 0: next_shard_iters = [] for shard_iter in shard_iters: response = kinesis.get_records(ShardIterator=shard_iter, Limit=10000) for record in response['Records']: record_data = record['Data'] record_data = json.loads(record_data) payload_decoded = base64.b64decode(record_data['databaseActivityEvents']) data_key_decoded = base64.b64decode(record_data['key']) data_key_decrypt_result = kms.decrypt(CiphertextBlob=data_key_decoded, EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID}) print (decrypt_decompress(payload_decoded, data_key_decrypt_result['Plaintext'])) if 'NextShardIterator' in response: next_shard_iters.append(response['NextShardIterator']) shard_iters = next_shard_iters if __name__ == '__main__': main()