Amazon Redshift との Aurora ゼロ ETL 統合の開始方法 - Amazon Aurora

Amazon Redshift との Aurora ゼロ ETL 統合の開始方法

Amazon Redshift とのゼロ ETL 統合を作成する前に、必要なパラメータとアクセス許可で Aurora DB クラスターと Amazon Redshift データウェアハウスを設定します。セットアップ時には、以下の手順を完了します。

これらのタスクが完了したら、Amazon Redshift との Amazon ゼロ ETL 統合の作成 に進みます。

AWS SDK を使用して、セットアッププロセスを自動化できます。詳細については、「AWS SDK を使用して統合をセットアップする (Aurora MySQL のみ)」を参照してください。

ステップ 1: カスタム DB クラスターのパラメータグループを作成する

Amazon Redshift との Aurora ゼロ ETL 統合には、レプリケーションを制御する DB クラスターパラメータに特定の値が必要です。具体的には、Aurora MySQL には拡張バイナリログ (aurora_enhanced_binlog) が必要であり、Aurora PostgreSQL には拡張論理レプリケーション (aurora.enhanced_logical_replication) が必要です。

バイナリロギングまたは論理レプリケーションを設定するには、まずカスタム DB クラスターパラメータグループを作成し、それをソース DB クラスターに関連付ける必要があります。

ソース DB エンジンに応じて、以下の設定でカスタム DB クラスターパラメータグループを作成します。カスタムパラメータグループを作成するには、「DB クラスターパラメータグループを使用する」を参照してください。

Aurora MySQL (aurora-mysql8.0 ファミリー):

  • aurora_enhanced_binlog=1

  • binlog_backup=0

  • binlog_format=ROW

  • binlog_replication_globaldb=0

  • binlog_row_image=full

  • binlog_row_metadata=full

さらに、binlog_transaction_compressionパラメータが ON設定されていないこと、および binlog_row_value_options パラメータが PARTIAL_JSON設定されていないことを確認してください。

Aurora MySQL 拡張バイナリログの詳細については、「拡張バイナリログ記録の設定」を参照してください。

Aurora PostgreSQL (aurora-postgresql15 ファミリー):

注記

Aurora PostgreSQL DB クラスターの場合、米国東部 (オハイオ) (us-east-2) AWS リージョン の Amazon RDS データベースプレビュー環境 内にカスタムパラメータグループを作成する必要があります。

  • rds.logical_replication=1

  • aurora.enhanced_logical_replication=1

  • aurora.logical_replication_backup=0

  • aurora.logical_replication_globaldb=0

拡張論理レプリケーション (aurora.enhanced_logical_replication) を有効にすると、REPLICA IDENTITY パラメータが自動的に FULL に設定されます。つまり、すべての列の値が先書きログ (WAL) に書き込まれます。これにより、ソース DB クラスターの IOPS が増加します。

ステップ 2: ソース DB クラスターを選択または作成する

カスタム DB クラスターのパラメータグループを作成したら、Aurora MySQL または Aurora PostgreSQL DB クラスターを選択または作成します。この のクラスターは、Amazon Redshift へのデータレプリケーションのソースになります。

このクラスターは、Aurora MySQL バージョン 3.05 (MySQL 8.0.32 互換) 以降、または Aurora PostgreSQL (PostgreSQL 15.4 およびゼロ ETL サポートと互換) を実行している必要があります。の DB クラスターの作成手順については、「Amazon Aurora DB クラスターの作成」を参照してください。

注記

米国東部 (オハイオ) (us-east-2) AWS リージョン の Amazon RDS データベースプレビュー環境 内に Aurora PostgreSQL DB クラスターを作成する必要があります。

[追加設定] で、デフォルトの DB クラスターパラメータグループを、前のステップで作成したカスタムパラメータグループに変更します。

注記

Aurora MySQL の場合、DB クラスターの作成後にパラメータグループをクラスターに関連付ける場合は、ゼロ ETL 統合を作成する前にクラスター内のプライマリ DB インスタンスを再起動して変更を適用する必要があります。手順については、Amazon Aurora DB クラスターまたは Amazon Aurora DB インスタンスの再起動 を参照してください。

Amazon Redshift との Aurora PostgreSQL ゼロ ETL 統合のプレビューリリースでは、クラスターの作成中にクラスターをカスタム DB クラスターパラメータグループに関連付ける必要があります。ソース DB クラスターの作成後は、このアクションを実行できません。作成していない場合は、クラスターを削除して再作成する必要があります。

ステップ 3: ターゲット Amazon Redshift データウェアハウスを作成する

ソース DB クラスターを作成した後、Amazon Redshift でターゲットのデータウェアハウスを作成して設定する必要があります。データウェアハウスは、以下の要件を満たしている必要があります。

  • プレビューで作成 (Aurora PostgreSQL ソースのみ)。Aurora MySQL ソースの場合、本番クラスターとワークグループを作成する必要があります。

    • プロビジョニングされたクラスターをプレビューで作成するには、プロビジョニングされたクラスターダッシュボードのバナーから [プレビュークラスターの作成] を選択します。詳細については、「プレビュークラスターの作成」を参照してください。

      クラスターを作成するときは、プレビュートラックpreview_2023 に設定します。

    • Redshift Serverless ワークグループをプレビューで作成するには、Serverless ダッシュボードのバナーから [プレビューワークグループの作成] を選択します。詳細については、「プレビューワークグループの作成」を参照してください。

  • RA3 ノードタイプ (ra3.xlplusra3.4xlargera3.16xlarge のいずれか)、または Redshift Serverless を使用している。

  • 暗号化されている (プロビジョニングされたクラスターを使用している場合) 詳細については、「Amazon Redshift データベースの暗号化」を参照してください。

データウェアハウスを作成する手順については、プロビジョニングされたクラスター用の「クラスターの作成」またはRedshift Serverless 用の「名前空間を使用したワークグループの作成」を参照してください。

データウェアハウスで大文字と小文字の区別を有効にします。

統合を正常に行うには、データウェアハウスで大文字と小文字を区別するパラメータ (enable_case_sensitive_identifier) を有効にする必要があります。デフォルトでは、プロビジョニング済みクラスターと Redshift Serverless ワークグループの大文字と小文字の区別は無効になっています。

大文字と小文字の区別を有効にするには、データウェアハウスのタイプに応じて以下の手順を実行します。

  • プロビジョニングされたクラスター — プロビジョニングされたクラスターで大文字と小文字の区別を有効にするには、enable_case_sensitive_identifier パラメータを有効にしたカスタムパラメータグループを作成します。次に、そのパラメータグループとクラスターを関連付けます。手順については、「コンソールを使用したパラメータグループの管理」または「AWS CLI を使用したパラメータ値の設定」を参照してください。

    注記

    クラスターにパラメータグループを関連付けたら、クラスターを再起動することを忘れないでください。

  • Serverless ワークグループ — Redshift Serverless ワークグループで大文字と小文字の区別を有効にするには、AWS CLI を使用する必要があります。Amazon Redshift コンソールは現在、Redshift Serverless パラメータ値の変更をサポートしていません。次の update-workgroup リクエストを送信します。

    aws redshift-serverless update-workgroup \ --workgroup-name target-workgroup \ --config-parameters parameterKey=enable_case_sensitive_identifier,parameterValue=true

    パラメータ値を変更した後、ワークグループを再起動する必要はありません。

データウェアハウスの認証を設定します。

データウェアハウスを作成したら、ソース Aurora DB クラスターを承認済みの統合ソースとして設定する必要があります。手順については、「Amazon Redshift データウェアハウスの認証を設定する」を参照してください。

AWS SDK を使用して統合をセットアップする (Aurora MySQL のみ)

各リソースを手動でセットアップするのではなく、次の Python スクリプトを実行して、必要なリソースを自動的にセットアップできます。このコード例では AWS SDK for Python (Boto3) を使用してソース Aurora MySQL DB クラスターとターゲット Amazon Redshift データウェアハウスを作成し、それぞれに必要なパラメータ値を指定します。次に、クラスターが使用可能になるまで待ってから、クラスター間にゼロ ETL 統合を作成します。設定する必要があるリソースに応じて、さまざまな関数をコメントアウトできます。

必要な従属関係をインストールには、次のコマンドを実行します。

pip install boto3 pip install time

スクリプト内で、オプションでソース、ターゲット、パラメータグループの名前を変更します。最後の関数は、リソースのセットアップ後に my-integration という名前の統合を作成します。

import boto3 import time # Build the client using the default credential configuration. # You can use the CLI and run 'aws configure' to set access key, secret # key, and default Region. rds = boto3.client('rds') redshift = boto3.client('redshift') sts = boto3.client('sts') source_cluster_name = 'my-source-cluster' # A name for the source cluster source_param_group_name = 'my-source-param-group' # A name for the source parameter group target_cluster_name = 'my-target-cluster' # A name for the target cluster target_param_group_name = 'my-target-param-group' # A name for the target parameter group def create_source_cluster(*args): """Creates a source Aurora MySQL DB cluster""" response = rds.create_db_cluster_parameter_group( DBClusterParameterGroupName=source_param_group_name, DBParameterGroupFamily='aurora-mysql8.0', Description='For Aurora MySQL zero-ETL integrations' ) print('Created source parameter group: ' + response['DBClusterParameterGroup']['DBClusterParameterGroupName']) response = rds.modify_db_cluster_parameter_group( DBClusterParameterGroupName=source_param_group_name, Parameters=[ { 'ParameterName': 'aurora_enhanced_binlog', 'ParameterValue': '1', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_backup', 'ParameterValue': '0', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_format', 'ParameterValue': 'ROW', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_replication_globaldb', 'ParameterValue': '0', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_row_image', 'ParameterValue': 'full', 'ApplyMethod': 'pending-reboot' }, { 'ParameterName': 'binlog_row_metadata', 'ParameterValue': 'full', 'ApplyMethod': 'pending-reboot' } ] ) print('Modified source parameter group: ' + response['DBClusterParameterGroupName']) response = rds.create_db_cluster( DBClusterIdentifier=source_cluster_name, DBClusterParameterGroupName=source_param_group_name, Engine='aurora-mysql', EngineVersion='8.0.mysql_aurora.3.05.2', DatabaseName='myauroradb', MasterUsername='username', MasterUserPassword='Password01**' ) print('Creating source cluster: ' + response['DBCluster']['DBClusterIdentifier']) source_arn = (response['DBCluster']['DBClusterArn']) create_target_cluster(target_cluster_name, source_arn, target_param_group_name) response = rds.create_db_instance( DBInstanceClass='db.r6g.2xlarge', DBClusterIdentifier=source_cluster_name, DBInstanceIdentifier=source_cluster_name + '-instance', Engine='aurora-mysql' ) return(response) def create_target_cluster(target_cluster_name, source_arn, target_param_group_name): """Creates a target Redshift cluster""" response = redshift.create_cluster_parameter_group( ParameterGroupName=target_param_group_name, ParameterGroupFamily='redshift-1.0', Description='For Aurora MySQL zero-ETL integrations' ) print('Created target parameter group: ' + response['ClusterParameterGroup']['ParameterGroupName']) response = redshift.modify_cluster_parameter_group( ParameterGroupName=target_param_group_name, Parameters=[ { 'ParameterName': 'enable_case_sensitive_identifier', 'ParameterValue': 'true' } ] ) print('Modified target parameter group: ' + response['ParameterGroupName']) response = redshift.create_cluster( ClusterIdentifier=target_cluster_name, NodeType='ra3.4xlarge', NumberOfNodes=2, Encrypted=True, MasterUsername='username', MasterUserPassword='Password01**', ClusterParameterGroupName=target_param_group_name ) print('Creating target cluster: ' + response['Cluster']['ClusterIdentifier']) # Retrieve the target cluster ARN response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Retrieve the current user's account ID response = sts.get_caller_identity() account_id = response['Account'] # Create a resource policy specifying cluster ARN and account ID response = redshift.put_resource_policy( ResourceArn=target_arn, Policy=''' { \"Version\":\"2012-10-17\", \"Statement\":[ {\"Effect\":\"Allow\", \"Principal\":{ \"Service\":\"redshift.amazonaws.com\" }, \"Action\":[\"redshift:AuthorizeInboundIntegration\"], \"Condition\":{ \"StringEquals\":{ \"aws:SourceArn\":\"%s\"} } }, {\"Effect\":\"Allow\", \"Principal\":{ \"AWS\":\"arn:aws:iam::%s:root\"}, \"Action\":\"redshift:CreateInboundIntegration\"} ] } ''' % (source_arn, account_id) ) return(response) def wait_for_cluster_availability(*args): """Waits for both clusters to be available""" print('Waiting for clusters to be available...') response = rds.describe_db_clusters( DBClusterIdentifier=source_cluster_name ) source_status = response['DBClusters'][0]['Status'] source_arn = response['DBClusters'][0]['DBClusterArn'] response = rds.describe_db_instances( DBInstanceIdentifier=source_cluster_name + '-instance' ) source_instance_status = response['DBInstances'][0]['DBInstanceStatus'] response = redshift.describe_clusters( ClusterIdentifier=target_cluster_name ) target_status = response['Clusters'][0]['ClusterStatus'] target_arn = response['Clusters'][0]['ClusterNamespaceArn'] # Every 60 seconds, check whether the clusters are available. if source_status != 'available' or target_status != 'available' or source_instance_status != 'available': time.sleep(60) response = wait_for_cluster_availability( source_cluster_name, target_cluster_name) else: print('Clusters available. Ready to create zero-ETL integration.') create_integration(source_arn, target_arn) return def create_integration(source_arn, target_arn): """Creates a zero-ETL integration using the source and target clusters""" response = rds.create_integration( SourceArn=source_arn, TargetArn=target_arn, IntegrationName='my-integration' ) print('Creating integration: ' + response['IntegrationName']) def main(): """main function""" create_source_cluster(source_cluster_name, source_param_group_name) wait_for_cluster_availability(source_cluster_name, target_cluster_name) if __name__ == "__main__": main()

次のステップ

ソース Aurora DB クラスターと Amazon Redshift ターゲットデータウェアハウスを作成したので、ゼロ ETL 統合を作成してデータのレプリケーションを開始できます。手順については、Amazon Redshift との Amazon ゼロ ETL 統合の作成 を参照してください。