Abilitazione della crittografia in transito su un cluster Redis OSS progettato autonomamente utilizzando Python - Amazon ElastiCache (sistema operativo Redis)

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Abilitazione della crittografia in transito su un cluster Redis OSS progettato autonomamente utilizzando Python

La seguente guida mostrerà come abilitare la crittografia in transito su un cluster Redis OSS 7.0 originariamente creato con la crittografia in transito disabilitata. I client TCP e TLS continueranno a comunicare con il cluster durante questo processo senza tempi di inattività.

Boto3 otterrà le credenziali necessarie (aws_access_key_id, aws_secret_access_key e aws_session_token) dalle variabili di ambiente. Tali credenziali verranno incollate in anticipo nello stesso terminale bash in cui verrà eseguito python3 per elaborare il codice Python mostrato in questa guida. Il codice nell'esempio seguente è stato elaborato da un'istanza EC2 lanciata nello stesso VPC che verrà utilizzato per creare ElastiCache il cluster Redis OSS al suo interno.

Nota
  • Gli esempi seguenti utilizzano l'SDK boto3 per le operazioni di ElastiCache gestione (creazione di cluster o utenti) e redis-py/ per la gestione dei dati. redis-py-cluster

  • È necessario utilizzare almeno la versione boto3 (=~) 1.26.39 per utilizzare la migrazione TLS online con l'API di modifica del cluster.

  • ElastiCache supporta la migrazione TLS online solo per i cluster Redis OSS con versione 7.0 o successiva. Quindi, se disponi di un cluster che esegue una versione Redis OSS precedente alla 7.0, dovrai aggiornare la versione Redis OSS del cluster. Per ulteriori informazioni sulle differenze di versione, consultare Differenze di compatibilità e comportamento delle versioni principali.

Definisci le costanti di stringa che avvieranno il Redis OSS Cluster ElastiCache

Per prima cosa, definiamo alcune semplici costanti di stringa Python che conterranno i nomi delle AWS entità necessarie per creare il ElastiCache cluster come security-groupCache Subnet group, e a. default parameter group Tutte queste AWS entità devono essere create in anticipo nel tuo AWS account nella regione che intendi utilizzare.

#Constants definitions SECURITY_GROUP = "sg-0492aa0a29c558427" CLUSTER_DESCRIPTION = "This cluster has been launched as part of the online TLS migration user guide" EC_SUBNET_GROUP = "client-testing" DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED = "default.redis7.cluster.on"

Definizione delle classi per la configurazione del cluster

Ora, definiamo alcune semplici classi Python che rappresenteranno una configurazione di un cluster, che conterranno i metadati sul cluster come la versione Redis OSS, il tipo di istanza e se la crittografia in transito (TLS) è abilitata o disabilitata.

#Class definitions class Config: def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, ): self.instance_type = instance_type self.version = version self.multi_az = multi_az self.TLS = TLS self.name = name or f"tls-test" def create_base_launch_request(self): return { "ReplicationGroupId": self.name, "TransitEncryptionEnabled": self.TLS, "MultiAZEnabled": self.multi_az, "CacheNodeType": self.instance_type, "Engine": "redis", "EngineVersion": self.version, "CacheSubnetGroupName": EC_SUBNET_GROUP , "CacheParameterGroupName": DEFAULT_PARAMETER_GROUP_REDIS_7_CLUSTER_MODE_ENABLED , "ReplicationGroupDescription": CLUSTER_DESCRIPTION, "SecurityGroupIds": [SECURITY_GROUP], } class ConfigCME(Config): def __init__( self, instance_type: str = "cache.t4g.small", version: str = "7.0", multi_az: bool = True, TLS: bool = True, name: str = None, num_shards: int = 2, num_replicas_per_shard: int = 1, ): super().__init__(instance_type, version, multi_az, TLS, name) self.num_shards = num_shards self.num_replicas_per_shard = num_replicas_per_shard def create_launch_request(self) -> dict: launch_request = self.create_base_launch_request() launch_request["NumNodeGroups"] = self.num_shards launch_request["ReplicasPerNodeGroup"] = self.num_replicas_per_shard return launch_request

Definizione di una classe che rappresenterà il cluster stesso

Ora, definiamo alcune semplici classi Python che rappresenteranno lo stesso ElastiCache Redis OSS Cluster. Questa classe avrà un campo client che conterrà un client boto3 per operazioni di ElastiCache gestione come la creazione del cluster e l'interrogazione dell'API. ElastiCache

import botocore.config import boto3 # Create boto3 client def init_client(region: str = "us-east-1"): config = botocore.config.Config(retries={"max_attempts": 10, "mode": "standard"}) init_request = dict() init_request["config"] = config init_request["service_name"] = "elasticache" init_request["region_name"] = region return boto3.client(**init_request) class ElastiCacheClusterBase: def __init__(self, name: str): self.name = name self.elasticache_client = init_client() def get_first_replication_group(self): return self.elasticache_client.describe_replication_groups( ReplicationGroupId=self.name )["ReplicationGroups"][0] def get_status(self) -> str: return self.get_first_replication_group()["Status"] def get_transit_encryption_enabled(self) -> bool: return self.get_first_replication_group()["TransitEncryptionEnabled"] def is_available(self) -> bool: return self.get_status() == "available" def is_modifying(self) -> bool: return self.get_status() == "modifying" def wait_for_available(self): while True: if self.is_available(): break else: time.sleep(5) def wait_for_modifying(self): while True: if self.is_modifying(): break else: time.sleep(5) def delete_cluster(self) -> bool: self.elasticache_client.delete_replication_group( ReplicationGroupId=self.name, RetainPrimaryCluster=False ) def modify_transit_encryption_mode(self, new_transit_encryption_mode: str): # generate api call to migrate the cluster to TLS preffered or to TLS required self.elasticache_client.modify_replication_group( ReplicationGroupId=self.name, TransitEncryptionMode=new_transit_encryption_mode, TransitEncryptionEnabled=True, ApplyImmediately=True, ) self.wait_for_modifying() class ElastiCacheClusterCME(ElastiCacheClusterBase): def __init__(self, name: str): super().__init__(name) @classmethod def launch(cls, config: ConfigCME = None) -> ElastiCacheClusterCME: config = config or ConfigCME() print(config) new_cluster = ElastiCacheClusterCME(config.name) launch_request = config.create_launch_request() new_cluster.elasticache_client.create_replication_group(**launch_request) new_cluster.wait_for_available() return new_cluster def get_configuration_endpoint(self) -> str: return self.get_first_replication_group()["ConfigurationEndpoint"]["Address"] #Since the code can throw exceptions, we define this class to make the code more readable and #so we won't forget to delete the cluster class ElastiCacheCMEManager: def __init__(self, config: ConfigCME = None): self.config = config or ConfigCME() def __enter__(self) -> ElastiCacheClusterCME: self.cluster = ElastiCacheClusterCME.launch(self.config) return self.cluster def __exit__(self, exc_type, exc_val, exc_tb): self.cluster.delete_cluster()

(Facoltativo) Crea una classe wrapper per la connessione del client demo al cluster Redis OSS

Ora, creiamo una classe wrapper per il client redis-py-cluster. Questa classe wrapper supporta la precompilazione del cluster con alcune chiavi e l'esecuzione di comandi get ripetuti in modo casuale.

Nota

Questo è un passaggio facoltativo ma semplifica il codice della funzione principale che verrà fornito in un passaggio successivo.

import redis improt random from time import perf_counter_ns, time class DowntimeTestClient: def __init__(self, client): self.client = client # num of keys prefilled self.prefilled = 0 # percent of get above prefilled self.percent_get_above_prefilled = 10 # nil result expected when get hit above prefilled # total downtime in nano seconds self.downtime_ns = 0 # num of success and fail operations self.success_ops = 0 self.fail_ops = 0 self.connection_errors = 0 self.timeout_errors = 0 def replace_client(self, client): self.client = client def prefill_data(self, timelimit_sec=60): end_time = time() + timelimit_sec while time() < end_time: self.client.set(self.prefilled, self.prefilled) self.prefilled += 1 # unsuccesful operations throw exceptions def _exec(self, func): try: start_ns = perf_counter_ns() func() self.success_ops += 1 elapsed_ms = (perf_counter_ns() - start_ns) // 10 ** 6 # upon succesful execution of func # reset random_key to None so that the next command # will use a new random key self.random_key = None except Exception as e: elapsed_ns = perf_counter_ns() - start_ns self.downtime_ns += elapsed_ns # in case of failure- increment the relevant counters so that we will keep track # of how many connection issues we had while trying to communicate with # the cluster. self.fail_ops += 1 if e.__class__ is redis.exceptions.ConnectionError: self.connection_errors += 1 if e.__class__ is redis.exceptions.TimeoutError: self.timeout_errors += 1 def _repeat_exec(self, func, seconds): end_time = time() + seconds while time() < end_time: self._exec(func) def _new_random_key_if_needed(self, percent_above_prefilled): if self.random_key is None: max = int((self.prefilled * (100 + percent_above_prefilled)) / 100) return random.randint(0, max) return self.random_key def _random_get(self): key = self._new_random_key_if_needed(self.percent_get_above_prefilled) result = self.client.get(key) # we know the key was set for sure only in the case key < self.prefilled if key < self.prefilled: assert result.decode("UTF-8") == str(key) def repeat_get(self, seconds=60): self._repeat_exec(self._random_get, seconds) def get_downtime_ms(self) -> int: return self.downtime_ns // 10 ** 6 def do_get_until(self, cond_check): while not cond_check(): self.repeat_get() # do one more get cycle once condition is met self.repeat_get()

Creazione della funzione principale che illustra il processo di modifica della configurazione della crittografia dei dati in transito

Ora, definiamo la funzione principale, che eseguirà le operazioni seguenti:

  1. Crea il cluster utilizzando il client boto3. ElastiCache

  2. Inizializzare il client redis-py-cluster che si connetterà al cluster con una connessione TCP chiara senza TLS.

  3. Il client redis-py-cluster precompila il cluster con alcuni dati.

  4. Il client boto3 attiverà la migrazione TLS da no-TLS a TLS preferred.

  5. Durante la migrazione del cluster a TLS Preferred, il client TCP redis-py-cluster invierà operazioni get ripetute al cluster finché la migrazione non è terminata.

  6. Al termine della migrazione a TLS Preferred, affermeremo che il cluster supporta la crittografia dei dati in transito. Successivamente, creeremo un client redis-py-cluster che si connetterà al cluster con TLS.

  7. Invieremo alcuni comandi get utilizzando il nuovo client TLS e il vecchio client TCP.

  8. Il client boto3 attiverà la migrazione TLS da TLS Preferred a TLS required.

  9. Durante la migrazione del cluster a TLS richiesto, il client redis-py-cluster TLS invierà get operazioni ripetute al cluster fino al termine della migrazione.

import redis def init_cluster_client( cluster: ElastiCacheClusterCME, prefill_data: bool, TLS: bool = True) -> DowntimeTestClient: # we must use for the host name the cluster configuration endpoint. redis_client = redis.RedisCluster( host=cluster.get_configuration_endpoint(), ssl=TLS, socket_timeout=0.25, socket_connect_timeout=0.1 ) test_client = DowntimeTestClient(redis_client) if prefill_data: test_client.prefill_data() return test_client if __name__ == '__main__': config = ConfigCME(TLS=False, instance_type="cache.m5.large") with ElastiCacheCMEManager(config) as cluster: # create a client that will connect to the cluster with clear tcp connection test_client_tcp = init_cluster_client(cluster, prefill_data=True, TLS=False) # migrate the cluster to TLS Preferred cluster.modify_transit_encryption_mode(new_transit_encryption_mode="preferred") # do repeated get commands until the cluster finishes the migration to TLS Preferred test_client_tcp.do_get_until(cluster.is_available) # verify that in transit encryption is enabled so that clients will be able to connect to the cluster with TLS assert cluster.get_transit_encryption_enabled() == True # create a client that will connect to the cluster with TLS connection. # we must first make sure that the cluster indeed supports TLS test_client_tls = init_cluster_client(cluster, prefill_data=True, TLS=True) # by doing get commands with the tcp client for 60 more seconds # we can verify that the existing tcp connection to the cluster still works test_client_tcp.repeat_get(seconds=60) # do get commands with the new TLS client for 60 more seconds test_client_tcp.repeat_get(seconds=60) # migrate the cluster to TLS required cluster.modify_transit_encryption_mode(new_transit_encryption_mode="required") # from this point the tcp clients will be disconnected and we must not use them anymore. # do get commands with the TLS client until the cluster finishes migartion to TLS required mode. test_client_tls.do_get_until(cluster.is_available)