Python を使用して独自設計型 Redis OSS クラスターで転送時の暗号化を有効にする - Amazon ElastiCache (Redis OSS)

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Python を使用して独自設計型 Redis OSS クラスターで転送時の暗号化を有効にする

次のガイドでは、転送時の暗号化を無効にして最初に作成された Redis OSS 7.0 クラスターで転送時の暗号化を有効にする方法を示します。TCP クライアントと TLS クライアントは、このプロセス中もダウンタイムなしでクラスターとの通信を継続します。

Boto3 は環境変数から必要な認証情報 (aws_access_key_idaws_secret_access_key、および aws_session_token) を取得します。これらの認証情報は、このガイドに示されている Python コードを処理するために python3 を実行するのと同じ bash ターミナルにあらかじめ貼り付けておきます。以下の例のコードは、Redis OSS クラスターの作成に使用されるのと同じ VPC ElastiCache で起動された EC2 インスタンスから処理されました。

注記
  • 次の例では、 ElastiCache 管理オペレーション (クラスターまたはユーザー作成) に boto3 SDK を使用し、データ処理redis-py-cluster に redis-py/ を使用します。

  • クラスター変更 API でオンライン TLS 移行を使用するには、boto3 バージョン (=~) 1.26.39 以上を使用する必要があります。

  • ElastiCache は、バージョン 7.0 以降の Redis OSS クラスターに対してのみオンライン TLS 移行をサポートします。そのため、7.0 より前のバージョンの Redis OSS を実行しているクラスターがある場合は、クラスターの Redis OSS バージョンをアップグレードする必要があります。バージョンの違いの詳細については、「メジャーバージョンの動作と互換性の違い」を参照してください。

Redis OSS ElastiCache クラスターを起動する文字列定数を定義する

まず、、、 Cache Subnet groupなどsecurity-group、 ElastiCache クラスターの作成に必要な AWS エンティティの名前を保持するシンプルな Python 文字列定数を定義しましょうdefault parameter group。これらの AWS エンティティはすべて、使用するリージョンの AWS アカウントで事前に作成する必要があります。

#Constants definitions SECURITY_GROUP = "sg-0492aa0a29c558427" CLUSTER_DESCRIPTION = "This cluster has been launched as part of the online TLS migration user guide" EC_SUBNET_GROUP = "client-testing" DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED = "default.redis7.cluster.on"

クラスター構成用のクラスを定義する

次に、クラスターの設定を表すシンプルな Python クラスをいくつか定義します。このクラスには、Redis OSS バージョン、インスタンスタイプ、転送時の暗号化 (TLS) が有効か無効かなど、クラスターに関するメタデータが格納されます。

#Class definitions class Config: def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, ): self.instance_type = instance_type self.version = version self.multi_az = multi_az self.TLS = TLS self.name = name or f"tls-test" def create_base_launch_request(self): return { "ReplicationGroupId": self.name, "TransitEncryptionEnabled": self.TLS, "MultiAZEnabled": self.multi_az, "CacheNodeType": self.instance_type, "Engine": "redis", "EngineVersion": self.version, "CacheSubnetGroupName": EC_SUBNET_GROUP , "CacheParameterGroupName": DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED , "ReplicationGroupDescription": CLUSTER_DESCRIPTION, "SecurityGroupIds": [SECURITY_GROUP], } class ConfigCME(Config): def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, num_shards: int = 2, num_replicas_per_shard: int = 1, ): super().__init__(instance_type, version, multi_az, TLS, name) self.num_shards = num_shards self.num_replicas_per_shard = num_replicas_per_shard def create_launch_request(self) -> dict: launch_request = self.create_base_launch_request() launch_request["NumNodeGroups"] = self.num_shards launch_request["ReplicasPerNodeGroup"] = self.num_replicas_per_shard return launch_request

クラスター自体を表すクラスを定義する

次に、Redis OSS クラスター自体を表すシンプルな ElastiCache Python クラスをいくつか定義します。このクラスには、クラスターの作成や API のクエリなどの ElastiCache 管理オペレーション用の boto3 クライアントを保持するクライアントフィールドがあります ElastiCache。

import botocore.config import boto3 # Create boto3 client def init_client(region: str = "us-east-1"): config = botocore.config.Config(retries={"max_attempts": 10, "mode": "standard"}) init_request = dict() init_request["config"] = config init_request["service_name"] = "elasticache" init_request["region_name"] = region return boto3.client(**init_request) class ElastiCacheClusterBase: def __init__(self, name: str): self.name = name self.elasticache_client = init_client() def get_first_replication_group(self): return self.elasticache_client.describe_replication_groups( ReplicationGroupId=self.name )["ReplicationGroups"][0] def get_status(self) -> str: return self.get_first_replication_group()["Status"] def get_transit_encryption_enabled(self) -> bool: return self.get_first_replication_group()["TransitEncryptionEnabled"] def is_available(self) -> bool: return self.get_status() == "available" def is_modifying(self) -> bool: return self.get_status() == "modifying" def wait_for_available(self): while True: if self.is_available(): break else: time.sleep(5) def wait_for_modifying(self): while True: if self.is_modifying(): break else: time.sleep(5) def delete_cluster(self) -> bool: self.elasticache_client.delete_replication_group( ReplicationGroupId=self.name, RetainPrimaryCluster=False ) def modify_transit_encryption_mode(self, new_transit_encryption_mode: str): # generate api call to migrate the cluster to TLS preffered or to TLS required self.elasticache_client.modify_replication_group( ReplicationGroupId=self.name, TransitEncryptionMode=new_transit_encryption_mode, TransitEncryptionEnabled=True, ApplyImmediately=True, ) self.wait_for_modifying() class ElastiCacheClusterCME(ElastiCacheClusterBase): def __init__(self, name: str): super().__init__(name) @classmethod def launch(cls, config: ConfigCME = None) -> ElastiCacheClusterCME: config = config or ConfigCME() print(config) new_cluster = ElastiCacheClusterCME(config.name) launch_request = config.create_launch_request() new_cluster.elasticache_client.create_replication_group(**launch_request) new_cluster.wait_for_available() return new_cluster def get_configuration_endpoint(self) -> str: return self.get_first_replication_group()["ConfigurationEndpoint"]["Address"] #Since the code can throw exceptions, we define this class to make the code more readable and #so we won't forget to delete the cluster class ElastiCacheCMEManager: def __init__(self, config: ConfigCME = None): self.config = config or ConfigCME() def __enter__(self) -> ElastiCacheClusterCME: self.cluster = ElastiCacheClusterCME.launch(self.config) return self.cluster def __exit__(self, exc_type, exc_val, exc_tb): self.cluster.delete_cluster()

(オプション) Redis OSS クラスターへのクライアント接続をデモするラッパークラスを作成する

次に、redis-py-cluster クライアント用のラッパークラスを作成しましょう。このラッパークラスは、クラスターにいくつかのキーをあらかじめ入力してから、ランダムに繰り返し get コマンドを実行することをサポートします。

注記

これはオプションのステップですが、後のステップに含まれる main 関数のコードが簡略化されます。

import redis improt random from time import perf_counter_ns, time class DowntimeTestClient: def __init__(self, client): self.client = client # num of keys prefilled self.prefilled = 0 # percent of get above prefilled self.percent_get_above_prefilled = 10 # nil result expected when get hit above prefilled # total downtime in nano seconds self.downtime_ns = 0 # num of success and fail operations self.success_ops = 0 self.fail_ops = 0 self.connection_errors = 0 self.timeout_errors = 0 def replace_client(self, client): self.client = client def prefill_data(self, timelimit_sec=60): end_time = time() + timelimit_sec while time() < end_time: self.client.set(self.prefilled, self.prefilled) self.prefilled += 1 # unsuccesful operations throw exceptions def _exec(self, func): try: start_ns = perf_counter_ns() func() self.success_ops += 1 elapsed_ms = (perf_counter_ns() - start_ns) // 10 ** 6 # upon succesful execution of func # reset random_key to None so that the next command # will use a new random key self.random_key = None except Exception as e: elapsed_ns = perf_counter_ns() - start_ns self.downtime_ns += elapsed_ns # in case of failure- increment the relevant counters so that we will keep track # of how many connection issues we had while trying to communicate with # the cluster. self.fail_ops += 1 if e.__class__ is redis.exceptions.ConnectionError: self.connection_errors += 1 if e.__class__ is redis.exceptions.TimeoutError: self.timeout_errors += 1 def _repeat_exec(self, func, seconds): end_time = time() + seconds while time() < end_time: self._exec(func) def _new_random_key_if_needed(self, percent_above_prefilled): if self.random_key is None: max = int((self.prefilled * (100 + percent_above_prefilled)) / 100) return random.randint(0, max) return self.random_key def _random_get(self): key = self._new_random_key_if_needed(self.percent_get_above_prefilled) result = self.client.get(key) # we know the key was set for sure only in the case key < self.prefilled if key < self.prefilled: assert result.decode("UTF-8") == str(key) def repeat_get(self, seconds=60): self._repeat_exec(self._random_get, seconds) def get_downtime_ms(self) -> int: return self.downtime_ns // 10 ** 6 def do_get_until(self, cond_check): while not cond_check(): self.repeat_get() # do one more get cycle once condition is met self.repeat_get()

転送中の暗号化設定を変更するプロセスをデモする main 関数を作成する

それでは、次の処理を行う main 関数を定義しましょう。

  1. boto3 ElastiCache client を使用してクラスターを作成します。

  2. TLS を使用しないクリア TCP 接続でクラスターに接続する redis-py-cluster クライアントを初期化します。

  3. redis-py-cluster クライアントはクラスターにいくつかのデータを事前入力します。

  4. boto3 クライアントは、TLS なしから TLS 優先への TLS 移行をトリガーします。

  5. クラスターが TLS Preferred に移行されている間、redis-py-cluster TCP クライアントは、移行が完了するまでクラスターに繰り返し get オペレーションを送信します。

  6. TLS Preferred への移行が完了したら、クラスターが転送中の暗号化をサポートしていることを確認します。その後、TLS を使用してクラスターに接続する redis-py-cluster クライアントを作成します。

  7. 新しい TLS クライアントと古い TCP クライアントを使用して、いくつかの get コマンドを送信します。

  8. boto3 クライアントは、TLS Preferred から TLS 必須への TLS 移行をトリガーします。

  9. クラスターの TLS への移行中、 redis-py-clusterTLS クライアントは移行が完了するまでクラスターに繰り返しgetオペレーションを送信します。

import redis def init_cluster_client( cluster: ElastiCacheClusterCME, prefill_data: bool, TLS: bool = True) -> DowntimeTestClient: # we must use for the host name the cluster configuration endpoint. redis_client = redis.RedisCluster( host=cluster.get_configuration_endpoint(), ssl=TLS, socket_timeout=0.25, socket_connect_timeout=0.1 ) test_client = DowntimeTestClient(redis_client) if prefill_data: test_client.prefill_data() return test_client if __name__ == '__main__': config = ConfigCME(TLS=False, instance_type="cache.m5.large") with ElastiCacheCMEManager(config) as cluster: # create a client that will connect to the cluster with clear tcp connection test_client_tcp = init_cluster_client(cluster, prefill_data=True, TLS=False) # migrate the cluster to TLS Preferred cluster.modify_transit_encryption_mode(new_transit_encryption_mode="preferred") # do repeated get commands until the cluster finishes the migration to TLS Preferred test_client_tcp.do_get_until(cluster.is_available) # verify that in transit encryption is enabled so that clients will be able to connect to the cluster with TLS assert cluster.get_transit_encryption_enabled() == True # create a client that will connect to the cluster with TLS connection. # we must first make sure that the cluster indeed supports TLS test_client_tls = init_cluster_client(cluster, prefill_data=True, TLS=True) # by doing get commands with the tcp client for 60 more seconds # we can verify that the existing tcp connection to the cluster still works test_client_tcp.repeat_get(seconds=60) # do get commands with the new TLS client for 60 more seconds test_client_tcp.repeat_get(seconds=60) # migrate the cluster to TLS required cluster.modify_transit_encryption_mode(new_transit_encryption_mode="required") # from this point the tcp clients will be disconnected and we must not use them anymore. # do get commands with the TLS client until the cluster finishes migartion to TLS required mode. test_client_tls.do_get_until(cluster.is_available)