Amazon Aurora でデータベースアクティビティストリームを使用する - Amazon Aurora

Amazon Aurora でデータベースアクティビティストリームを使用する

データベースのアクティビティをモニタリングすることで、データベースを保護し、コンプライアンスおよび規制要件を満たすことができます。Amazon Aurora でデータベースアクティビティをモニタリングするには、データベースアクティビティストリーム機能を使用できます。データベースアクティビティストリームは、リレーショナルデータベース内のアクティビティのほぼリアルタイムのストリームを提供します。データベースのアクティビティストリームをモニタリングツールと統合すると、データベースのアクティビティをモニタリングおよび監査できます。

外部のセキュリティ上の脅威を超えて、マネージド型データベースによって、データベース管理者 (DBA) の内部リスクから保護する必要があります。データベースアクティビティストリームは、データベースアクティビティストリームへのデータベース管理者アクセスを制御することによって、データベースを内部の脅威から保護します。そのため、データベースアクティビティのストリームの収集、転送、格納、およびその後の処理は、データベースを管理する DBA のアクセスを超えています。

データベースアクティビティのストリームは、Aurora から、Aurora DB クラスターの代わりに作成された Amazon Kinesis データストリームにプッシュされます。Kinesis からは、Amazon Kinesis Data Firehose や AWS Lambda などの AWS のサービスがアクティビティストリームを消費できます。データベースアクティビティストリームは、無料の機能ですが、Amazon Kinesisそのストリームに対しては課金されます。詳細については、「Amazon Kinesis Data Streams の料金表」を参照してください。

アクティビティストリームは、コンプライアンス管理のためにアプリケーションによって使用することもあります。Aurora PostgreSQLによるデータベースアクティビティストリームの場合、これらのコンプライアンスアプリケーションには、IBM の Security Guardium、McAfeeのData Center Security Suite、Imperva の SecureSphere Database Audit and Protection が含まれます。このようなアプリケーションは、ストリームを使用してアラートを生成し、AuroraDB クラスターを監査できます。

ヒント

ウェブサイトで、データベースアクティビティストリームで使用する特定のアプリケーションを確認します。使用する特定の Aurora DB エンジンとエンジンのバージョンのサポートを確認します。

データベースアクティビティストリームには、次の制限と要件があります。

  • Aurora PostgreSQL の場合、データベースアクティビティストリームは、バージョン 2.3 以上およびバージョン 3.0 以上でサポートされています。PostgreSQL のバージョンの互換性については、「Amazon Aurora PostgreSQL のエンジンバージョン」を参照してください。

  • Aurora MySQL の場合、データベースアクティビティストリームはバージョン 2.08 以上でサポートされています。このバージョンは MySQL バージョン 5.7 と互換性があります。

  • データベースアクティビティストリームは、Auroraグローバルデータベースの主または二次クラスタで開始できます。DB エンジンのバージョン要件については、Amazon Aurora グローバルデータベースの制限を参照してください。

  • データベースアクティビティストリームは、「DB インスタンスクラスでサポートされている DB エンジン」で Aurora のリストにある DB インスタンスクラスをサポートしていますが、いくつかの例外があります。

    • Aurora PostgreSQL の場合、ストリームは db.r4 または db.r5 インスタンスクラスでのみ使用します。

    • Aurora MySQL では、db.t2 インスタンスクラス または db.t3 インスタンスクラスではストリームを使用できません。

  • データベースアクティビティストリームは、次の AWS リージョンではサポートされていません。

    • 中国 (北京) リージョン、cn-north-1

    • 中国 (寧夏) リージョン、cn-northwest-1

    • アジアパシフィック (大阪: ローカル)、ap-northeast-3

    • AWS GovCloud (米国東部)、us-gov-east-1

    • AWS GovCloud (米国西部)、us-gov-west-1

  • データベースアクティビティストリームには、AWS Key Management Service (AWS KMS) を使用する必要があります。アクティビティストリームは常に暗号化されているため、AWS KMS が必要です。

  • データベースアクティビティストリームは、Aurora Serverlessではサポートされていません。

Aurora MySQL データベースアクティビティストリームのネットワーク前提条件

Aurora MySQL でデータベースアクティビティストリームを使用するには、データベースアクティビティストリームを使用する VPC 内のすべての DB インスタンスから AWS KMS エンドポイントにアクセスできる必要があります。Aurora MySQL クラスターのデータベースアクティビティストリームを有効にする前に、この要件が満たされていることを確認してください。

重要

Aurora MySQL DB クラスターが AWS KMS エンドポイントにアクセスできない場合、アクティビティストリームは機能を停止します。この場合、Aurora は RDS イベントを使用してこの問題について通知します。

Aurora クラスターが一般公開されている場合、この要件は自動的に満たされます。

AWS KMS エンドポイントにアクセスできないために Aurora MySQL クラスターがデータベースアクティビティストリームを使用できないという通知を受け取った場合は、Aurora DB クラスターがパブリックかプライベートかを確認します。Aurora DB クラスターがプライベートの場合は、接続を有効にするように設定する必要があります。

Aurora DB クラスターがパブリックの場合は、パブリックアクセス可能とマークされている必要があります。この場合、AWS マネジメントコンソール で DB クラスターの詳細を見ると、[パブリックアクセス可能] が [はい] となっています。DB クラスターは Amazon VPC パブリックサブネットにも存在する必要があります。パブリックアクセス可能な DB インスタンスの詳細については、「VPC 内の DB インスタンスの使用」を参照してください。Amazon VPC のパブリックサブネットの詳細については、「VPC とサブネット」を参照してください。

Aurora DB クラスターがパブリックアクセス可能ではなく、VPC パブリックサブネットにある場合は、プライベートです。Aurora MySQL クラスターをプライベートにしておき、データベースアクティビティストリームで使用することもできます。その場合、ネットワークアドレス変換 (NAT) を介してインターネットアドレスに接続できるよう、クラスターを設定します。VPC での NAT の設定の詳細については、「NAT ゲートウェイ」を参照してください。

VPC エンドポイントの設定の詳細については、「VPC エンドポイント」を参照してください。

データベースアクティビティストリームの開始

DB クラスターレベルでアクティビティストリームを開始して、クラスターのすべての DB インスタンスのデータベースアクティビティをモニタリングします。クラスターに追加された DB インスタンスも自動的にモニタリングされます。

アクティビティストリームを開始すると、変更やアクセスなどの各データベースアクティビティイベントによってアクティビティストリームイベントが生成されます。アクセスイベントは SQL コマンド (例: CONNECTSELECT) から生成されます。変更イベントは SQL コマンド (例: CREATEINSERT) から生成されます。各アクティビティストリームイベントを永続化するために、Aurora はイベントを暗号化して保存します。

データベースセッションでデータベースアクティビティイベントを非同期的または同期的に処理するように選択できます。

  • 非同期モード非同期モードでは、データベースセッションでアクティビティストリームイベントが生成されると、セッションは直ちに通常のアクティビティに戻ります。アクティビティストリームイベントは、バックグラウンドで永続的なレコードになります。バックグラウンドタスクでエラーが発生した場合は、RDS イベントが送信されます。このイベントは、アクティビティストリームのイベントレコードが失われた可能性がある時間枠の開始と終了を示します。

    非同期モードでは、アクティビティストリームの精度よりもデータベースのパフォーマンスが優先されます。

    注記

    非同期モードは、Aurora PostgreSQL と Aurora MySQL の両方で使用できます。

  • 同期モード同期モードでは、データベースセッションでアクティビティストリームイベントが生成されると、そのイベントが永続化されるまで、セッションによってその他のアクティビティはブロックされます。何らかの理由でイベントを永続化できない場合、データベースセッションは通常のアクティビティに戻ります。ただし、アクティビティストリームレコードがしばらくの間失われる可能性があることを示す RDS イベントが送信されます。システムが正常な状態に戻ったら、2 番目の RDS イベントが送信されます。

    同期モードでは、データベースパフォーマンスよりもアクティビティストリームの精度が優先されます。

    注記

    同期モードは、Aurora PostgreSQL で使用できます。Aurora MySQL では同期モードを使用できません。

アクティビティストリームを開始するには

  1. Amazon RDS コンソール (https://console.aws.amazon.com/rds/) を開きます。

  2. ナビゲーションペインで、[データベース] を選択します。

  3. 使用する DB クラスターを選択します。

  4. [アクション] で [Start activity stream (アクティビティストリームの開始)] を選択します。[データベースアクティビティストリーム] ウィンドウが表示されます。

  5. [データベースアクティビティストリーム] ウィンドウに次の設定を入力します。

    • [マスターキー] で、AWS KMS カスタマーマスターキー (CMK) のリストからキーを選択します。

      注記

      Aurora MySQL クラスターが AWS KMS CMK にアクセスできない場合は、「Aurora MySQL データベースアクティビティストリームのネットワーク前提条件」の手順に従って、まずそのようなアクセスを有効にします。

      マスターキーは、ログに記録されたデータベースアクティビティを暗号化するキーを暗号化するために使用されます。デフォルトキー以外のマスターキーを選択する必要があります。暗号化キーと AWS KMS の詳細については、AWS Key Management Service Developer Guideの「AWS Key Management Service とは」を参照してください。

    • [データベースアクティビティストリームモード] で、[非同期] または [同期] を選択します。

      注記

      現在、この選択は Aurora PostgreSQL にのみ適用されます。Aurora MySQL では、非同期モードのみを使用できます。

    • [すぐに適用] を選択します。

      [次回のメンテナンスウィンドウのスケジュール] を選択した場合、データベースはすぐに再起動されません。代わりに、それは PENDING REBOOT 状態のままです。この場合、データベースアクティビティストリームは、次のメンテナンスウィンドウまで開始されません。

    設定を入力し終わったら、[続行] を選択します。

    クラスターの DB インスタンスステータスは、データベースアクティビティストリームが構成されていることを示しています。

DB クラスターのデータベースアクティビティストリームを開始するには、AWS CLI の start-activity-stream コマンドを使用して DB クラスターを設定します。DB クラスターの AWS リージョンを識別するには、--region パラメータを指定します。--apply-immediately パラメータはオプションです。

Linux、macOS、Unix の場合:

aws rds --region MY_REGION \ start-activity-stream \ --mode [sync | async] \ --kms-key-id MY_KMS_KEY_ARN \ --resource-arn MY_CLUSTER_ARN \ --apply-immediately

Windows の場合:

aws rds --region MY_REGION ^ start-activity-stream ^ --mode [sync | async] ^ --kms-key-id MY_KMS_KEY_ARN ^ --resource-arn MY_CLUSTER_ARN ^ --apply-immediately
注記

--mode パラメータは必須です。Aurora PostgreSQL では、いずれかの値を選択できます。Aurora MySQL では、常に async を指定する必要があります。

アクティビティストリームのステータスの取得

アクティビティストリームのステータスを取得するに は、コンソールまたは AWS CLI を使用します。

DB クラスターのアクティビティストリームのステータスを取得するには

  1. Amazon RDS コンソール (https://console.aws.amazon.com/rds/) を開きます。

  2. ナビゲーションペインで、[データベース] を選択し、DB クラスターを選択します。

  3. [設定] タブを選択して、ステータスを取得する [データベースアクティビティストリーム] をオンにします。

CLI の describe-db-clusters リクエストに対するレスポンスとして、DB クラスターのアクティビティストリーム設定を取得することができます。以下の例で、ActivityStreamKinesisStreamNameActivityStreamStatusActivityStreamKmsKeyIdActivityStreamMode の値を参照してください。

リクエストは次のとおりです。

aws rds --region MY_REGION describe-db-clusters --db-cluster-identifier my-cluster

このレスポンスには、データベースアクティビティストリームの次の項目が含まれます。

以下は JSON レスポンスの例です。これらのフィールドは Aurora PostgreSQL および Aurora MySQL でも同じです。ただし ActivityStreamMode は、常に Aurora MySQL の async、Aurora PostgreSQL の場合は sync または async になります。

{ "DBClusters": [ { "DBClusterIdentifier": "my-cluster", ... "ActivityStreamKinesisStreamName": "aws-rds-das-cluster-A6TSYXITZCZXJHIRVFUBZ5LTWY", "ActivityStreamStatus": "starting", "ActivityStreamKmsKeyId": "12345678-abcd-efgh-ijkl-bd041f170262", "ActivityStreamMode": "async", "DbClusterResourceId": "cluster-ABCD123456" ... } ] }

アクティビティストリームの停止

アクティビティストリームを停止するには、コンソールまたは AWS CLI を使用します。

DB クラスターを削除すると、アクティビティストリームが停止し、基になる Amazon Kinesis ストリームが自動的に削除されます。

アクティビティストリームを停止するには

  1. Amazon RDS コンソール (https://console.aws.amazon.com/rds/) を開きます。

  2. ナビゲーションペインで、[データベース] を選択します。

  3. データベースアクティビティストリームを停止する DB クラスターを選択します。

  4. [アクション] で [Stop activity stream (アクティビティストリームの停止)] を選択します。[データベースアクティビティストリーム] ウィンドウが表示されます。

    1. [すぐに適用] を選択します。

      [次回のメンテナンスウィンドウのスケジュール] を選択した場合、データベースはすぐに再起動されません。代わりに、それは PENDING REBOOT 状態のままです。この場合、データアクティビティストリームは、次のメンテナンスウィンドウまで無効になりません。

    2. [Continue] を選択します。

DB クラスターのデータベースアクティビティストリームを停止するには、AWS CLI の stop-activity-stream コマンドを使用します。DB クラスターの AWS リージョンを識別するには、--region パラメータを指定します。--apply-immediately パラメータはオプションです。

Linux、macOS、Unix の場合:

aws rds --region MY_REGION \ stop-activity-stream \ --resource-arn MY_CLUSTER_ARN \ --apply-immediately

Windows の場合:

aws rds --region MY_REGION ^ stop-activity-stream ^ --resource-arn MY_CLUSTER_ARN ^ --apply-immediately

データベースアクティビティストリームのモニタリング

データベースアクティビティストリームは、次に説明するように、データベースのアクティビティをモニタリングして報告します。

アクティビティのストリームは、収集後、Amazon Kinesis に送信されます。Kinesis から、アクティビティストリームをモニタリングしたり、他のサービスやアプリケーションがアクティビティストリームを使用して詳細な分析を行うことができます。基礎となる Kinesis ストリーム名は、AWS CLI コマンド describe-db-clusters または RDS API DescribeDBClusters オペレーションを使用して確認できます。

Aurora によって Kinesis ストリームが管理されます。

  • 既存の Kinesis ストリームを使用しません。Aurora は、24 時間の保存期間で Kinesis ストリームを自動的に作成します。

  • Aurora は、必要に応じて Kinesis ストリームをスケールします。

  • データベースアクティビティストリームを停止するか、DB クラスターを削除すると、Aurora によって Kinesis ストリームが削除されます。

以下のカテゴリのアクティビティがモニタリングされ、アクティビティストリームの監査ログに追加されます。

  • SQL コマンド – すべての SQL コマンドに加えて、準備済みステートメント、組み込み関数、および SQL 手続型言語 (PL/SQL) の関数も監査されます。ストアドプロシージャへの呼び出しが監査されます。ストアドプロシージャまたは関数内で発行された SQL ステートメントも監査されます。

  • 他のデータベース情報 – モニタリングされるアクティビティには、完全な SQL ステートメント、DML コマンドから影響を受ける行の行数、アクセスされたオブジェクト、および一意のデータベース名が含まれます。Aurora PostgreSQL の場合、データベースアクティビティストリームは、バインド変数とストアドプロシージャパラメータもモニタリングします。

    重要

    各ステートメントの完全な SQL テキストは、機密データを含むアクティビティストリーム監査ログに表示されます。ただし、Aurora が次の SQL ステートメントのようにコンテキストから判断できる場合、データベースユーザーのパスワードは編集されます。

    ALTER ROLE role-name WITH password
  • 接続情報 – モニタリングされるアクティビティには、セッションとネットワークの情報、サーバープロセス ID、および終了コードなどがあります。

DB インスタンスのモニタリング中にアクティビティストリームに障害が発生した場合は、RDS イベントを使用して通知されます。障害が発生した場合は、DB インスタンスをシャットダウン、または続行することができます。

Kinesis からのアクティビティストリームへのアクセス

DB クラスターのアクティビティストリームを有効にすると、Kinesis ストリームが作成されます。データベースのアクティビティは、Kinesis からリアルタイムでモニタリングできます。データベースのアクティビティを詳細に分析するには、Kinesis ストリームをコンシューマーアプリケーションに接続します。また、コンプライアンス管理アプリケーションに接続することもできます。

Kinesis からアクティビティストリームにアクセスするには

  1. https://console.aws.amazon.com/kinesis/ にある Kinesis コンソールを開きます。

  2. Kinesis ストリームのリストからアクティビティストリームを選択します。

    アクティビティストリームの名前には、プレフィックス aws-rds-das- とその後に DB クラスターのリソース ID が続きます。次に例を示します。

    aws-rds-das-cluster-NHVOV4PCLWHGF52NP

    Amazon RDS コンソールを使用して、DB クラスターのリソース ID を検索するには、データベースのリストから DB クラスターを選択し、[設定] タブを選択します。

    AWS CLI を使用して、アクティビティストリームの Kinesis ストリームの完全な名前を検索するには、CLI の describe-db-clusters CLI リクエストを使用して、レスポンスの ActivityStreamKinesisStreamName の値を書き留めます。

  3. データベースアクティビティの観察を開始するには、[モニタリング] を選択します。

Amazon Kinesis の使用の詳細については、「Amazon Kinesis Data Streams とは」を参照してください。

監査ログの内容と例

モニタリングされるデータベースアクティビティイベントは、Kinesis アクティビティストリームでは JSON 文字列として表されます。この構造は、DatabaseActivityMonitoringRecord を含む JSON オブジェクトで構成されます。このオブジェクトには、アクティビティイベントの databaseActivityEventList 配列が含まれます。

データベースアクティビティストリームの監査ログの例

以下に、アクティビティイベントレコードの復号されたサンプルの JSON 監査ログを示します。

例 Aurora PostgreSQL CONNECT SQL ステートメントのアクティビティイベントレコード

以下に、psql クライアント (clientApplication) による CONNECT SQL ステートメント (command) を使用したログオンのアクティビティイベントレコードを示します。

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-10-30 00:39:49.940668+00", "logTime": "2019-10-30 00:39:49.990579+00", "statementId": 1, "substatementId": 1, "objectType": null, "command": "CONNECT", "objectName": null, "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "49804", "sessionId": "5ce5f7f0.474b", "rowCount": null, "commandText": null, "paramList": [], "pid": 18251, "clientApplication": "psql", "exitCode": null, "class": "MISC", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }

例 Aurora MySQL CONNECT SQL ステートメントのアクティビティイベントレコード

以下に、mysql クライアント (clientApplication) による CONNECT SQL ステートメント (command) を使用したログインのアクティビティイベントレコードを示します。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:13.267214+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"rdsadmin", "databaseName":"", "remoteHost":"localhost", "remotePort":"11053", "command":"CONNECT", "commandText":"", "paramList":null, "objectType":"TABLE", "objectName":"", "statementId":0, "substatementId":1, "exitCode":"0", "sessionId":"725121", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:13.267207+00", "endTime":"2020-05-22 18:07:13.267213+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }

例 Aurora PostgreSQL CREATE TABLE ステートメントのアクティビティイベントレコード

Aurora PostgreSQL の CREATE TABLE イベントの例を以下に示します。

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-05-24 00:36:54.403455+00", "logTime": "2019-05-24 00:36:54.494235+00", "statementId": 2, "substatementId": 1, "objectType": null, "command": "CREATE TABLE", "objectName": null, "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "34534", "sessionId": "5ce73c6f.7e64", "rowCount": null, "commandText": "create table my_table (id serial primary key, name varchar(32));", "paramList": [], "pid": 32356, "clientApplication": "psql", "exitCode": null, "class": "DDL", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }

例 Aurora MySQL CREATE TABLE ステートメントのアクティビティイベントレコード

以下は、Aurora MySQL の CREATE TABLE ステートメントの例です。オペレーションは、2 つの個別のイベントレコードとして表されます。1 つのイベントに "class":"MAIN" があります。他方のイベントには、"class":"AUX" があります。メッセージは任意の順序で到着する可能性があります。MAIN イベントの logTime フィールドは、常に対応する AUX イベントの logTime フィールドよりも前にあります。

次の例は、MAIN の値が class のイベントを示しています。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:12.250221+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"QUERY", "commandText":"CREATE TABLE test1 (id INT)", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65459278, "substatementId":1, "exitCode":"0", "sessionId":"725118", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:12.226384+00", "endTime":"2020-05-22 18:07:12.250222+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }

次の例は、AUX の値 が class を持つ対応するイベントを示しています。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:07:12.247182+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"CREATE", "commandText":"test1", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65459278, "substatementId":2, "exitCode":"", "sessionId":"725118", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:07:12.226384+00", "endTime":"2020-05-22 18:07:12.247182+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"AUX" } ] }

例 Aurora PostgreSQL SELECT ステートメントのアクティビティイベントレコード

SELECT イベントの例を以下に示します。

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents": { "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-4HNY5V4RRNPKKYB7ICFKE5JBQQ", "instanceId":"db-FZJTMYKCXQBUUZ6VLU7NW3ITCM", "databaseActivityEventList":[ { "startTime": "2019-05-24 00:39:49.920564+00", "logTime": "2019-05-24 00:39:49.940668+00", "statementId": 6, "substatementId": 1, "objectType": "TABLE", "command": "SELECT", "objectName": "public.my_table", "databaseName": "postgres", "dbUserName": "rdsadmin", "remoteHost": "172.31.3.195", "remotePort": "34534", "sessionId": "5ce73c6f.7e64", "rowCount": 10, "commandText": "select * from my_table;", "paramList": [], "pid": 32356, "clientApplication": "psql", "exitCode": null, "class": "READ", "serverVersion": "2.3.1", "serverType": "PostgreSQL", "serviceName": "Amazon Aurora PostgreSQL-Compatible edition", "serverHost": "172.31.3.192", "netProtocol": "TCP", "dbProtocol": "Postgres 3.0", "type": "record", "errorMessage": null } ] }, "key":"decryption-key" }

例 Aurora MySQL SELECT ステートメントのアクティビティイベントレコード

SELECT イベントの例を以下に示します。

次の例は、MAIN の値が class のイベントを示しています。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:29:57.986467+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"QUERY", "commandText":"SELECT * FROM test1 WHERE id < 28", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65469218, "substatementId":1, "exitCode":"0", "sessionId":"726571", "rowCount":2, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:29:57.986364+00", "endTime":"2020-05-22 18:29:57.986467+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"MAIN" } ] }

次の例は、AUX の値 が class を持つ対応するイベントを示しています。

{ "type":"DatabaseActivityMonitoringRecord", "clusterId":"cluster-some_id", "instanceId":"db-some_id", "databaseActivityEventList":[ { "logTime":"2020-05-22 18:29:57.986399+00", "type":"record", "clientApplication":null, "pid":2830, "dbUserName":"master", "databaseName":"test", "remoteHost":"localhost", "remotePort":"11054", "command":"READ", "commandText":"test1", "paramList":null, "objectType":"TABLE", "objectName":"test1", "statementId":65469218, "substatementId":2, "exitCode":"", "sessionId":"726571", "rowCount":0, "serverHost":"master", "serverType":"MySQL", "serviceName":"Amazon Aurora MySQL", "serverVersion":"MySQL 5.7.12", "startTime":"2020-05-22 18:29:57.986364+00", "endTime":"2020-05-22 18:29:57.986399+00", "transactionId":"0", "dbProtocol":"MySQL", "netProtocol":"TCP", "errorMessage":"", "class":"AUX" } ] }

データベースアクティビティのモニタリングレコードの JSON オブジェクト

データベースアクティビティイベントレコードは、次の情報を含む JSON オブジェクトにあります。

JSON フィールド データ型 説明

type

文字列

JSON レコードのタイプ。値は DatabaseActivityMonitoringRecords です。

version 文字列 データベースアクティビティモニタリングレコードのバージョン。生成されたデータベースアクティビティレコードのバージョンは、DB クラスターのエンジンのバージョンによって異なります。
  • バージョン 1.1 のデータベースアクティビティレコードは、エンジンバージョン 10.10 以降のマイナーバージョン、およびエンジンバージョン 11.5 以降を実行する Aurora PostgreSQL DB クラスターに対して生成されます。

  • バージョン 1.0 のデータベースアクティビティレコードは、エンジンバージョン 10.7 および 11.4 を実行している Aurora PostgreSQL DB クラスターに対して生成されます。

次のフィールドは、すべてバージョン 1.0 とバージョン 1.1 の両方にあります。ただし、明記されている場合を除きます。

databaseActivityEvents

文字列

アクティビティイベントを含む JSON オブジェクト。

key 文字列 databaseActivityEventList databaseActivityEventList JSON 配列の復号に使用する暗号化キー。

databaseActivityEvents JSON オブジェクト

databaseActivityEvents JSON オブジェクトには、次の情報が含まれています。

JSON レコードの最上位フィールド

監査ログの各イベントは、JSON 形式のレコード内にラップされます。このレコードには、次のフィールドが含まれます。

type

このフィールドは常に値 DatabaseActivityMonitoringRecords を持ちます。

version

このフィールドは、データベースアクティビティストリームデータプロトコルまたはコントラクトのバージョンを表します。これは、使用可能なフィールドを定義します。バージョン 1.0 は、Aurora PostgreSQL バージョン 10.7 および 11.4 の元のデータアクティビティストリームのサポートを表します。バージョン 1.1 は、Aurora PostgreSQL バージョン 10.10 以降、および Aurora PostgreSQL 11.5 以降のデータアクティビティストリームのサポートを表します。バージョン 1.1 には、追加のフィールド errorMessagestartTime が含まれています。バージョン 1.2 は、Aurora MySQL 2.08 以降のデータアクティビティストリームのサポートを表します。バージョン 1.2 には、追加のフィールド endTimetransactionId が含まれています。

databaseActivityEvents

1 つ以上のアクティビティイベントを表す暗号化された文字列。これは、base64 バイト配列として表されます。文字列を復号すると、このセクションの例に示すフィールドを持つ JSON 形式のレコードが生成されます。

key

databaseActivityEvents 文字列の暗号化に使用される暗号化されたデータキー。これは、データベースアクティビティストリームを開始したときに指定した AWS KMS カスタマーマスターキー (CMK) と同じです。

以下の例は、このレコードの形式を示しています。

{ "type":"DatabaseActivityMonitoringRecords", "version":"1.1", "databaseActivityEvents":"encrypted audit records", "key":"encrypted key" }

databaseActivityEvents フィールドの内容を復号化するには、次の手順を実行します。

  1. データベースアクティビティストリームを開始するときに指定した AWS KMS CMK を使用して、key JSON フィールドの値を復号します。これにより、データ暗号化キーがクリアテキストで返されます。

  2. databaseActivityEvents JSON フィールドの値を Base64 デコードして、監査ペイロードの暗号化テキストをバイナリ形式で取得します。

  3. 最初のステップでデコードしたデータ暗号化キーを使用して、バイナリ暗号文を復号化します。

  4. 復号化されたペイロードを解凍します。

    • 暗号化されたペイロードは、databaseActivityEvents フィールドにあります。

    • databaseActivityEventList フィールドには、監査レコードの配列が含まれます。配列内の type フィールドには、 record または heartbeat を使用できます。

監査ログのアクティビティイベントレコードは、次の情報を含む JSON オブジェクトです。

JSON フィールド データ型 説明

type

文字列

JSON レコードのタイプ。値は DatabaseActivityMonitoringRecord です。

clusterId 文字列 DB クラスターリソース識別子。DB クラスター属性 DbClusterResourceId に対応します。
instanceId 文字列 DB インスタンスのリソース識別子。DB インスタンス属性 DbiResourceId に対応します。

databaseActivityEventList

文字列

アクティビティ監査レコードまたはハートビートメッセージの配列。

databaseActivityEventList JSON 配列

監査ログのペイロードは、暗号化された databaseActivityEventList JSON 配列です。次の表に、監査ログの復号された DatabaseActivityEventList 配列内の各アクティビティイベントのフィールドをアルファベット順に示します。Aurora PostgreSQL または Aurora MySQL を使用するかどうかによって、フィールドは異なります。データベースエンジンに適用される表を参照してください。

重要

イベント構造は変更される場合があります。Aurora は、将来、アクティビティイベントに新しいフィールドを追加する可能性があります。JSON データを解析するアプリケーションでは、コードが未知のフィールド名に対して無視または適切なアクションを実行できることを確認します。

Aurora PostgreSQL の databaseActivityEventList フィールド
フィールド データ型 説明
class 文字列

アクティビティイベントのクラス。Aurora PostgreSQL の有効な値は以下のとおりです。

  • ALL

  • CONNECT – 接続イベントまたは切断イベント。

  • DDLROLE クラスのステートメントのリストに含まれていない DDL ステートメント。

  • FUNCTION – 関数の呼び出し、または DO ブロック。

  • MISC – さまざまなコマンド (例: DISCARDFETCHCHECKPOINTVACUUM)。

  • NONE

  • READSELECT ステートメントまたは COPY ステートメント (ソースがリレーションまたはクエリの場合)。

  • ROLE – ロールと特権に関するステートメント (例: GRANTREVOKECREATEALTERDROPROLE)。

  • WRITEINSERTUPDATEDELETETRUNCATECOPY ステートメント (宛先がリレーションの場合)。

clientApplication 文字列 クライアントのレポートどおりにクライアントが接続に使用していたアプリケーション。クライアントはこの情報を指定する必要はないため、値は null でも問題ありません。
command 文字列 SQL コマンドの名前 (コマンドの詳細は含まない)。
commandText 文字列

ユーザーによって渡された実際の SQL ステートメント。Aurora PostgreSQL の場合、値は元の SQL ステートメントと同じです。このフィールドは、接続レコードまたは切断レコードを除くすべてのタイプのレコードに使用されます。この場合、値は null です。

重要

各ステートメントの完全な SQL テキストは、機密データを含むアクティビティストリーム監査ログに表示されます。ただし、Aurora が次の SQL ステートメントのようにコンテキストから判断できる場合、データベースユーザーのパスワードは編集されます。

ALTER ROLE role-name WITH password
databaseName 文字列 ユーザーが接続したデータベース。
dbProtocol 文字列 データベースプロトコル (例: Postgres 3.0)。
dbUserName 文字列 クライアントが認証したデータベースユーザー。
errorMessage 文字列

エラーがあった場合、このフィールドには DB サーバーによって生成されるはずのエラーメッセージが表示されます。エラーにならなかった通常のステートメントの場合、errorMessage の値は null です。このフィールドは、バージョン 1.1 のデータベースアクティビティレコードでのみ使用されます。

エラーは、重大度が ERROR 以上の、クライアントで表示される PostgreSQL エラーログイベントを生成するアクティビティとして定義されます。詳細については、「PostgreSQL メッセージの重大度」を参照してください。たとえば、構文エラーやクエリのキャンセルは、エラーメッセージを生成します。

バックグラウンドの checkpointer プロセスエラーなどの内部の PostgreSQL サーバーエラーは、エラーメッセージを生成しません。ただし、ログの重大度レベルの設定に関係なく、このようなイベントのレコードは引き続き出力されます。これにより、攻撃者がログをオフにして検出を回避することを防ぎます。

exitCode フィールドも参照してください。

exitCode int セッション終了レコードに使用される値。clean exit では、終了コードが含まれます。エラーシナリオによっては、終了コードが常に得られるとは限りません。たとえば、PostgreSQL で exit() を実行している場合や、オペレーターが kill -9 などのコマンドを実行している場合があります。

エラーが発生した場合は、PostgreSQL エラーコードに記載している SQL エラーコード (SQLSTATE) が exitCode フィールドに表示されます。

errorMessage フィールドも参照してください。

logTime 文字列 監査コードパスに記録されているタイムスタンプ。これは、SQL ステートメントの実行終了時刻を表します。startTime フィールドも参照してください。
netProtocol 文字列 ネットワーク通信プロトコル。
objectName 文字列 データベースオブジェクトの名前 (SQL ステートメントを使用している場合)。このフィールドは、SQL ステートメントがデータベースオブジェクトに対して機能する場合にのみ使用されます。SQL ステートメントがオブジェクトに対して機能していない場合、この値は null です。
objectType 文字列 テーブル、インデックス、ビューなどのデータベースオブジェクトタイプ。このフィールドは、SQL ステートメントがデータベースオブジェクトに対して機能する場合にのみ使用されます。SQL ステートメントがオブジェクトに対して機能していない場合、この値は null です。有効な値には次のようなものがあります。
  • COMPOSITE TYPE

  • FOREIGN TABLE

  • FUNCTION

  • INDEX

  • MATERIALIZED VIEW

  • SEQUENCE

  • TABLE

  • TOAST TABLE

  • VIEW

  • UNKNOWN

paramList 文字列 SQL ステートメントに渡されるカンマ区切りのパラメータの配列。SQL ステートメントにパラメータがない場合、この値は空の配列です。
pid int クライアント接続を処理するために割り当てられているバックエンドプロセスのプロセス ID。
remoteHost 文字列 クライアントの IP アドレスまたはホスト名。Aurora PostgreSQL では、どちらが使用されるかは、データベースの log_hostname パラメータ設定によって異なります。
remotePort 文字列 クライアントのポート番号。
rowCount int SQL 文によって返された行数。たとえば、SELECTステートメントが 10 行を返す場合、RowCount は 10 になります。INSERT ステートメントまたは UPDATE ステートメントの場合、RowCount は 0 です。
serverHost 文字列 データベースサーバーのホスト IP アドレス。
serverType 文字列 データベースサーバーのタイプ (例: PostgreSQL)。
serverVersion 文字列 データベースサーバーのバージョン (例: Aurora PostgreSQL の 2.3.1)。
serviceName 文字列 サービスの名前 (例: Amazon Aurora PostgreSQL-Compatible edition)。
sessionId int 一意の疑似セッション識別子。
startTime 文字列

SQL ステートメントの実行が開始された時刻。このフィールドは、バージョン 1.1 のデータベースアクティビティレコードでのみ使用されます。

SQL ステートメントのおおよその実行時間を計算するには、logTime - startTime を使用します。logTime フィールドも参照してください。

statementId int クライアントの SQL ステートメントの識別子。カウンターはセッションレベルであり、クライアントによって SQL ステートメントが入力される度に増加します。
substatementId int SQL サブステートメントの識別子。この値は、statementId フィールドで識別された各 SQL ステートメントに含まれるサブステートメントをカウントします。
type 文字列 イベントタイプ。有効な値は record または heartbeat です。
Aurora MySQL の databaseActivityEventList フィールド
フィールド データ型 説明
class 文字列

アクティビティイベントのクラス。

Aurora MySQL の有効な値は以下のとおりです。

  • MAIN – SQL ステートメントを表すプライマリイベントです。

  • AUX – 追加の詳細を含む補足イベント。たとえば、オブジェクトの名前を変更したステートメントには、新しい名前を反映するクラス AUX を持つイベントがあります。

    同じステートメントに対応する MAIN イベントと AUX イベントを検索するには、pid フィールドと statementId フィールドの値が同じである、異なるイベントがないか確認します。

clientApplication 文字列 クライアントのレポートどおりにクライアントが接続に使用していたアプリケーション。クライアントはこの情報を指定する必要はないため、値は null でも問題ありません。
command 文字列

SQL ステートメントの一般的なカテゴリ。このフィールドの値は class の値によって異なります。

classMAIN の場合の値には、次の値が含まれます。

  • CONNECT – クライアントセッションが接続されている場合。

  • QUERY – SQL ステートメント。AUXclass 値が 1 つ以上のイベントを伴います。

  • DISCONNECT – クライアントセッションが切断されている場合。

  • FAILED_CONNECT – クライアントが接続を試みたが、接続できない場合。

  • CHANGEUSER – 発行するステートメントではなく、MySQL ネットワークプロトコルの一部である状態の変更。

classAUX の場合の値には、次の値が含まれます。

  • READSELECT ステートメントまたは COPY ステートメント (ソースがリレーションまたはクエリの場合)。

  • WRITEINSERTUPDATEDELETETRUNCATECOPY ステートメント (宛先がリレーションの場合)。

  • DROP – オブジェクトの削除。

  • CREATE – オブジェクトの作成。

  • RENAME – オブジェクトの名前の変更。

  • ALTER – オブジェクトのプロパティの変更。

commandText 文字列

MAINclass 値を持つイベントの場合、このフィールドはユーザーが渡した実際の SQL ステートメントを表します。このフィールドは、接続レコードまたは切断レコードを除くすべてのタイプのレコードに使用されます。この場合、値は null です。

AUXclass 値を持つイベントの場合、このフィールドには、イベントに関係するオブジェクトに関する補足情報が含まれます。

Aurora MySQL では、引用符などの文字の前には、エスケープ文字を表すバックスラッシュが付きます。

重要

各ステートメントの SQL テキスト全体が、機密データを含む監査ログに表示されます。ただし、Aurora が次の SQL ステートメントのようにコンテキストから判断できる場合、データベースユーザーのパスワードは編集されます。

mysql> SET PASSWORD = 'my-password';
databaseName 文字列 ユーザーが接続したデータベース。
dbProtocol 文字列 データベースプロトコル。現在、この値は常に Aurora MySQL の MySQL です。
dbUserName 文字列 クライアントが認証したデータベースユーザー。
endTime 文字列

SQL ステートメントの実行が終了した時刻。これは、協定世界時 (UTC) 形式で表されます。このフィールドは、バージョン 1.2 のデータベースアクティビティレコードにのみ存在します。

SQL ステートメントの実行時間を計算するには、endTime - startTime を使用します。startTime フィールドも参照してください。

errorMessage 文字列

エラーがあった場合、このフィールドには DB サーバーによって生成されるはずのエラーメッセージが表示されます。エラーにならなかった通常のステートメントの場合、errorMessage の値は null です。このフィールドは、バージョン 1.1 のデータベースアクティビティレコードでのみ使用されます。

エラーは、重大度が ERROR 以上の、クライアントで表示される MySQL エラーログイベントを生成するアクティビティとして定義されます。詳細については、MySQL リファレンスマニュアルの「エラーログ」を参照してください。たとえば、構文エラーやクエリのキャンセルは、エラーメッセージを生成します。

バックグラウンドの checkpointer プロセスエラーなどの内部の MySQL サーバーエラーは、エラーメッセージを生成しません。ただし、ログの重大度レベルの設定に関係なく、このようなイベントのレコードは引き続き出力されます。これにより、攻撃者がログをオフにして検出を回避することを防ぎます。

exitCode フィールドも参照してください。

exitCode int セッション終了レコードに使用される値。clean exit では、終了コードが含まれます。エラーシナリオによっては、終了コードが常に得られるとは限りません。このような場合、この値はゼロになるか、空白になる可能性があります。
logTime 文字列 監査コードパスに記録されているタイムスタンプ。これは、協定世界時 (UTC) 形式で表されます。ステートメントの期間を計算する最も正確な方法については、startTime および endTime フィールドを参照してください。
netProtocol 文字列 ネットワーク通信プロトコル。現在、この値は常に Aurora MySQL の TCP です。
objectName 文字列 データベースオブジェクトの名前 (SQL ステートメントを使用している場合)。このフィールドは、SQL ステートメントがデータベースオブジェクトに対して機能する場合にのみ使用されます。SQL ステートメントがオブジェクトに対して機能していない場合、この値は空白になります。オブジェクトの完全修飾名を作成するには、databaseNameobjectName を結合します。クエリに複数のオブジェクトが含まれる場合、このフィールドはカンマで区切られた名前のリストにすることができます。
objectType 文字列

テーブル、インデックスなどのデータベースオブジェクトタイプ。このフィールドは、SQL ステートメントがデータベースオブジェクトに対して機能する場合にのみ使用されます。SQL ステートメントがオブジェクトに対して機能していない場合、この値は null です。

Aurora MySQL の有効な値には次のようなものがあります。

  • INDEX

  • TABLE

  • UNKNOWN

paramList 文字列 このフィールドは Aurora MySQL には使用されず、常に null です。
pid int クライアント接続を処理するために割り当てられているバックエンドプロセスのプロセス ID。データベースサーバー再起動すると、pid が変更され、statementId フィールドのカウンターが最初からやり直されます。
remoteHost 文字列 SQL ステートメントを発行したクライアントの IP アドレスまたはホスト名。Aurora MySQL では、どちらが使用されるかは、データベースの skip_name_resolve パラメータ設定によって異なります。この値 localhost は、rdsadmin スペシャルユーザーからのアクティビティを示します。
remotePort 文字列 クライアントのポート番号。
rowCount int SQL ステートメントによって影響を受けた、または取得されたテーブルの行数。このフィールドは、データ操作言語 (DML) ステートメントである SQL ステートメントでのみ使用されます。SQL ステートメントが DML ステートメントではない場合、この値は null です。
serverHost 文字列 データベースサーバーインスタンス識別子。この値は、Aurora MySQL と Aurora PostgreSQL とでは異なって表現されます。Aurora PostgreSQL は識別子の代わりに IP アドレスを使用します。
serverType 文字列 データベースサーバーのタイプ (例: MySQL)。
serverVersion 文字列 データベースサーバーのバージョン。現在、この値は常に Aurora MySQL の MySQL 5.7.12 です。
serviceName 文字列 サービスの名前。現在、この値は常に Aurora MySQL の Amazon Aurora MySQL です。
sessionId int 一意の疑似セッション識別子。
startTime 文字列

SQL ステートメントの実行が開始された時刻。これは、協定世界時 (UTC) 形式で表されます。このフィールドは、バージョン 1.1 のデータベースアクティビティレコードでのみ使用されます。

SQL ステートメントの実行時間を計算するには、endTime - startTime を使用します。endTime フィールドも参照してください。

statementId int クライアントの SQL ステートメントの識別子。カウンターは、クライアントによって SQL ステートメントが入力される度に増加します。DB インスタンスが再起動されると、カウンターがリセットされます。
substatementId int SQL サブステートメントの識別子。この値は、クラス MAIN を持つイベントの場合は 1、クラス AUX を持つイベントの場合は 2 です。statementId フィールドを使用して、同じステートメントによって生成されたすべてのイベントを識別します。
transactionId 文字列 トランザクションの識別子。このフィールドは、バージョン 1.2 のデータベースアクティビティレコードでのみ使用されます。
type 文字列 イベントタイプ。有効な値は record または heartbeat です。

AWS SDK を使用したアクティビティストリームの処理

アクティビティストリームをプログラムで処理するには、AWS SDK を使用します。Kinesis データストリームを処理する方法で完全に機能する Java および Python の例を以下に示します。

Java
import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.net.InetAddress; import java.nio.ByteBuffer; import java.nio.charset.StandardCharsets; import java.security.NoSuchAlgorithmException; import java.security.NoSuchProviderException; import java.security.Security; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.UUID; import java.util.zip.GZIPInputStream; import javax.crypto.Cipher; import javax.crypto.NoSuchPaddingException; import javax.crypto.spec.SecretKeySpec; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; import com.amazonaws.encryptionsdk.AwsCrypto; import com.amazonaws.encryptionsdk.CryptoInputStream; import com.amazonaws.encryptionsdk.jce.JceMasterKey; import com.amazonaws.services.kinesis.clientlibrary.exceptions.InvalidStateException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ShutdownException; import com.amazonaws.services.kinesis.clientlibrary.exceptions.ThrottlingException; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessor; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorCheckpointer; import com.amazonaws.services.kinesis.clientlibrary.interfaces.IRecordProcessorFactory; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.InitialPositionInStream; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.KinesisClientLibConfiguration; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.ShutdownReason; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker; import com.amazonaws.services.kinesis.clientlibrary.lib.worker.Worker.Builder; import com.amazonaws.services.kinesis.model.Record; import com.amazonaws.services.kms.AWSKMS; import com.amazonaws.services.kms.AWSKMSClientBuilder; import com.amazonaws.services.kms.model.DecryptRequest; import com.amazonaws.services.kms.model.DecryptResult; import com.amazonaws.util.Base64; import com.amazonaws.util.IOUtils; import com.google.gson.Gson; import com.google.gson.GsonBuilder; import com.google.gson.annotations.SerializedName; import org.bouncycastle.jce.provider.BouncyCastleProvider; public class DemoConsumer { private static final String STREAM_NAME = "aws-rds-das-[cluster-external-resource-id]"; private static final String APPLICATION_NAME = "AnyApplication"; //unique application name for dynamo table generation that holds kinesis shard tracking private static final String AWS_ACCESS_KEY = "[AWS_ACCESS_KEY_TO_ACCESS_KINESIS]"; private static final String AWS_SECRET_KEY = "[AWS_SECRET_KEY_TO_ACCESS_KINESIS]"; private static final String DBC_RESOURCE_ID = "[cluster-external-resource-id]"; private static final String REGION_NAME = "[region-name]"; //us-east-1, us-east-2... private static final BasicAWSCredentials CREDENTIALS = new BasicAWSCredentials(AWS_ACCESS_KEY, AWS_SECRET_KEY); private static final AWSStaticCredentialsProvider CREDENTIALS_PROVIDER = new AWSStaticCredentialsProvider(CREDENTIALS); private static final AwsCrypto CRYPTO = new AwsCrypto(); private static final AWSKMS KMS = AWSKMSClientBuilder.standard() .withRegion(REGION_NAME) .withCredentials(CREDENTIALS_PROVIDER).build(); class Activity { String type; String version; String databaseActivityEvents; String key; } class ActivityEvent { @SerializedName("class") String _class; String clientApplication; String command; String commandText; String databaseName; String dbProtocol; String dbUserName; String endTime; String errorMessage; String exitCode; String logTime; String netProtocol; String objectName; String objectType; List<String> paramList; String pid; String remoteHost; String remotePort; String rowCount; String serverHost; String serverType; String serverVersion; String serviceName; String sessionId; String startTime; String statementId; String substatementId; String transactionId; String type; } class ActivityRecords { String type; String clusterId; String instanceId; List<ActivityEvent> databaseActivityEventList; } static class RecordProcessorFactory implements IRecordProcessorFactory { @Override public IRecordProcessor createProcessor() { return new RecordProcessor(); } } static class RecordProcessor implements IRecordProcessor { private static final long BACKOFF_TIME_IN_MILLIS = 3000L; private static final int PROCESSING_RETRIES_MAX = 10; private static final long CHECKPOINT_INTERVAL_MILLIS = 60000L; private static final Gson GSON = new GsonBuilder().serializeNulls().create(); private static final Cipher CIPHER; static { Security.insertProviderAt(new BouncyCastleProvider(), 1); try { CIPHER = Cipher.getInstance("AES/GCM/NoPadding", "BC"); } catch (NoSuchAlgorithmException | NoSuchPaddingException | NoSuchProviderException e) { throw new ExceptionInInitializerError(e); } } private long nextCheckpointTimeInMillis; @Override public void initialize(String shardId) { } @Override public void processRecords(final List<Record> records, final IRecordProcessorCheckpointer checkpointer) { for (final Record record : records) { processSingleBlob(record.getData()); } if (System.currentTimeMillis() > nextCheckpointTimeInMillis) { checkpoint(checkpointer); nextCheckpointTimeInMillis = System.currentTimeMillis() + CHECKPOINT_INTERVAL_MILLIS; } } @Override public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason) { if (reason == ShutdownReason.TERMINATE) { checkpoint(checkpointer); } } private void processSingleBlob(final ByteBuffer bytes) { try { // JSON $Activity final Activity activity = GSON.fromJson(new String(bytes.array(), StandardCharsets.UTF_8), Activity.class); // Base64.Decode final byte[] decoded = Base64.decode(activity.databaseActivityEvents); final byte[] decodedDataKey = Base64.decode(activity.key); Map<String, String> context = new HashMap<>(); context.put("aws:rds:dbc-id", DBC_RESOURCE_ID); // Decrypt final DecryptRequest decryptRequest = new DecryptRequest() .withCiphertextBlob(ByteBuffer.wrap(decodedDataKey)).withEncryptionContext(context); final DecryptResult decryptResult = KMS.decrypt(decryptRequest); final byte[] decrypted = decrypt(decoded, getByteArray(decryptResult.getPlaintext())); // GZip Decompress final byte[] decompressed = decompress(decrypted); // JSON $ActivityRecords final ActivityRecords activityRecords = GSON.fromJson(new String(decompressed, StandardCharsets.UTF_8), ActivityRecords.class); // Iterate throught $ActivityEvents for (final ActivityEvent event : activityRecords.databaseActivityEventList) { System.out.println(GSON.toJson(event)); } } catch (Exception e) { // Handle error. e.printStackTrace(); } } private static byte[] decompress(final byte[] src) throws IOException { ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(src); GZIPInputStream gzipInputStream = new GZIPInputStream(byteArrayInputStream); return IOUtils.toByteArray(gzipInputStream); } private void checkpoint(IRecordProcessorCheckpointer checkpointer) { for (int i = 0; i < PROCESSING_RETRIES_MAX; i++) { try { checkpointer.checkpoint(); break; } catch (ShutdownException se) { // Ignore checkpoint if the processor instance has been shutdown (fail over). System.out.println("Caught shutdown exception, skipping checkpoint." + se); break; } catch (ThrottlingException e) { // Backoff and re-attempt checkpoint upon transient failures if (i >= (PROCESSING_RETRIES_MAX - 1)) { System.out.println("Checkpoint failed after " + (i + 1) + "attempts." + e); break; } else { System.out.println("Transient issue when checkpointing - attempt " + (i + 1) + " of " + PROCESSING_RETRIES_MAX + e); } } catch (InvalidStateException e) { // This indicates an issue with the DynamoDB table (check for table, provisioned IOPS). System.out.println("Cannot save checkpoint to the DynamoDB table used by the Amazon Kinesis Client Library." + e); break; } try { Thread.sleep(BACKOFF_TIME_IN_MILLIS); } catch (InterruptedException e) { System.out.println("Interrupted sleep" + e); } } } } private static byte[] decrypt(final byte[] decoded, final byte[] decodedDataKey) throws IOException { // Create a JCE master key provider using the random key and an AES-GCM encryption algorithm final JceMasterKey masterKey = JceMasterKey.getInstance(new SecretKeySpec(decodedDataKey, "AES"), "BC", "DataKey", "AES/GCM/NoPadding"); try (final CryptoInputStream<JceMasterKey> decryptingStream = CRYPTO.createDecryptingStream(masterKey, new ByteArrayInputStream(decoded)); final ByteArrayOutputStream out = new ByteArrayOutputStream()) { IOUtils.copy(decryptingStream, out); return out.toByteArray(); } } public static void main(String[] args) throws Exception { final String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID(); final KinesisClientLibConfiguration kinesisClientLibConfiguration = new KinesisClientLibConfiguration(APPLICATION_NAME, STREAM_NAME, CREDENTIALS_PROVIDER, workerId); kinesisClientLibConfiguration.withInitialPositionInStream(InitialPositionInStream.LATEST); kinesisClientLibConfiguration.withRegionName(REGION_NAME); final Worker worker = new Builder() .recordProcessorFactory(new RecordProcessorFactory()) .config(kinesisClientLibConfiguration) .build(); System.out.printf("Running %s to process stream %s as worker %s...\n", APPLICATION_NAME, STREAM_NAME, workerId); try { worker.run(); } catch (Throwable t) { System.err.println("Caught throwable while processing data."); t.printStackTrace(); System.exit(1); } System.exit(0); } private static byte[] getByteArray(final ByteBuffer b) { byte[] byteArray = new byte[b.remaining()]; b.get(byteArray); return byteArray; } }
Python
import base64 import json import zlib import aws_encryption_sdk from aws_encryption_sdk import CommitmentPolicy from aws_encryption_sdk.internal.crypto import WrappingKey from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType import boto3 REGION_NAME = '<region>' # us-east-1 RESOURCE_ID = '<external-resource-id>' # cluster-ABCD123456 STREAM_NAME = 'aws-rds-das-' + RESOURCE_ID # aws-rds-das-cluster-ABCD123456 enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_ALLOW_DECRYPT) class MyRawMasterKeyProvider(RawMasterKeyProvider): provider_id = "BC" def __new__(cls, *args, **kwargs): obj = super(RawMasterKeyProvider, cls).__new__(cls) return obj def __init__(self, plain_key): RawMasterKeyProvider.__init__(self) self.wrapping_key = WrappingKey(wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING, wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC) def _get_raw_key(self, key_id): return self.wrapping_key def decrypt_payload(payload, data_key): my_key_provider = MyRawMasterKeyProvider(data_key) my_key_provider.add_master_key("DataKey") decrypted_plaintext, header = enc_client.decrypt( source=payload, materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager(master_key_provider=my_key_provider)) return decrypted_plaintext def decrypt_decompress(payload, key): decrypted = decrypt_payload(payload, key) return zlib.decompress(decrypted, zlib.MAX_WBITS + 16) def main(): session = boto3.session.Session() kms = session.client('kms', region_name=REGION_NAME) kinesis = session.client('kinesis', region_name=REGION_NAME) response = kinesis.describe_stream(StreamName=STREAM_NAME) shard_iters = [] for shard in response['StreamDescription']['Shards']: shard_iter_response = kinesis.get_shard_iterator(StreamName=STREAM_NAME, ShardId=shard['ShardId'], ShardIteratorType='LATEST') shard_iters.append(shard_iter_response['ShardIterator']) while len(shard_iters) > 0: next_shard_iters = [] for shard_iter in shard_iters: response = kinesis.get_records(ShardIterator=shard_iter, Limit=10000) for record in response['Records']: record_data = record['Data'] record_data = json.loads(record_data) payload_decoded = base64.b64decode(record_data['databaseActivityEvents']) data_key_decoded = base64.b64decode(record_data['key']) data_key_decrypt_result = kms.decrypt(CiphertextBlob=data_key_decoded, EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID}) print decrypt_decompress((payload_decoded, data_key_decrypt_result['Plaintext'])) if 'NextShardIterator' in response: next_shard_iters.append(response['NextShardIterator']) shard_iters = next_shard_iters if __name__ == '__main__': main()

データベースアクティビティストリームへのアクセスの管理

データベースアクティビティストリームに対する適切な AWS Identity and Access Management (IAM) ロールの権限が付与されているユーザーは、DB クラスターのアクティビティストリーム設定を作成、開始、停止、および変更できます。これらのアクションはストリームの監査ログに含まれています。コンプライアンスのベストプラクティスとして、これらの権限は DBA に付与しないことをお勧めします。

データベースアクティビティストリームへのアクセスを設定するには、IAM ポリシーを使用します。Aurora 認証の詳細については、Amazon Aurora での Identity and Access Management を参照してください。IAM ポリシーの作成の詳細については、IAM データベースアクセス用の IAM ポリシーの作成と使用 を参照してください。

例 データベースアクティビティストリームの設定を許可するポリシー

アクティビティストリームを変更するためのきめ細かいアクセスをユーザーに付与するには、IAM ポリシーでサービス固有のオペレーションコンテキストキー rds:StartActivityStreamrds:StopActivityStream および を使用します。次の IAM ポリシーの例では、ユーザーまたはロールがアクティビティストリームを設定することを許可します。

{ "Version":"2012-10-17", "Statement":[ { "Sid":"ConfigureActivityStreams", "Effect":"Allow", "Action": [ "rds:StartActivityStream", "rds:StopActivityStream" ], "Resource":"*", } ] }

例 データベースアクティビティストリームの開始を許可するポリシー

次の IAM ポリシーの例では、ユーザーまたはロールがアクティビティストリームを開始することを許可します。

{ "Version":"2012-10-17", "Statement":[ { "Sid":"AllowStartActivityStreams", "Effect":"Allow", "Action":"rds:StartActivityStream", "Resource":"*" } ] }

例 データベースアクティビティストリームの停止を許可するポリシー

次の IAM ポリシーの例では、ユーザーまたはロールがアクティビティストリームを停止することを許可します。

{ "Version":"2012-10-17", "Statement":[ { "Sid":"AllowStopActivityStreams", "Effect":"Allow", "Action":"rds:StopActivityStream", "Resource":"*" } ] }

例 データベースアクティビティストリームの開始を拒否するポリシー

次の IAM ポリシーの例では、ユーザーまたはロールによるアクティビティストリームの開始が阻止されます。

{ "Version":"2012-10-17", "Statement":[ { "Sid":"DenyStartActivityStreams", "Effect":"Deny", "Action":"rds:StartActivityStream", "Resource":"*" } ] }

例 データベースアクティビティストリーム停止を拒否するポリシー

次の IAM ポリシーの例では、ユーザーまたはロールによるアクティビティストリームの停止が阻止されます。

{ "Version":"2012-10-17", "Statement":[ { "Sid":"DenyStopActivityStreams", "Effect":"Deny", "Action":"rds:StopActivityStream", "Resource":"*" } ] }