Connexions Kinesis - AWS Glue

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Connexions Kinesis

Vous pouvez lire et écrire dans Amazon Kinesis Data Streams à l'aide d'informations stockées dans une table de catalogue de données ou en fournissant des informations permettant d'accéder directement au flux de données. Vous pouvez lire les informations de Kinesis dans un Spark DataFrame, puis les convertir en Glue AWS . DynamicFrame Vous pouvez DynamicFrames écrire dans Kinesis au format JSON. Si vous accédez directement au flux de données, utilisez ces options pour fournir des informations sur la façon d'accéder au flux de données.

Si vous utilisez getCatalogSource ou create_data_frame_from_catalog pour consommer des enregistrements à partir d'une source de streaming Kinesis, la tâche dispose des informations relatives à la base de données Data Catalog et au nom de la table, et peut les utiliser pour obtenir certains paramètres de base pour la lecture à partir de la source de streaming Kinesis. Si vous utilisez getSource, getSourceWithFormat, createDataFrameFromOptions ou create_data_frame_from_options, vous devez spécifier ces paramètres de base à l'aide des options de connexion décrites ici.

Vous pouvez spécifier les options de connexion pour Kinesis à l'aide des arguments suivants pour les méthodes spécifiées dans la classe GlueContext.

  • Scala

    • connectionOptions : utiliser avec getSource, createDataFrameFromOptions, getSink

    • additionalOptions : utiliser avec getCatalogSource, getCatalogSink

    • options : utiliser avec getSourceWithFormat, getSinkWithFormat

  • Python

    • connection_options : utiliser avec create_data_frame_from_options, write_dynamic_frame_from_options

    • additional_options : utiliser avec create_data_frame_from_catalog, write_dynamic_frame_from_catalog

    • options : utiliser avec getSource, getSink

Pour les remarques et les restrictions concernant les tâches ETL de streaming, consultez Restrictions et notes sur ETL en streaming.

Configurer Kinesis

Pour vous connecter à un flux de données Kinesis dans le cadre d'une tâche AWS Glue Spark, vous aurez besoin de certaines conditions préalables :

  • En cas de lecture, la tâche AWS Glue doit disposer d'autorisations IAM de niveau d'accès en lecture pour le flux de données Kinesis.

  • En cas d'écriture, la tâche AWS Glue doit disposer d'autorisations IAM de niveau d'accès Write au flux de données Kinesis.

Dans certains cas, vous devrez configurer des prérequis supplémentaires :

  • Si votre tâche AWS Glue est configurée avec des connexions réseau supplémentaires (généralement pour se connecter à d'autres ensembles de données) et que l'une de ces connexions fournit des options de réseau Amazon VPC, cela indiquera à votre tâche de communiquer via Amazon VPC. Dans ce cas, vous devrez également configurer votre flux de données Kinesis pour qu'il communique via Amazon VPC. Vous pouvez le faire en créant un point de terminaison d'un VPC d'interface entre votre Amazon VPC et votre flux de données Kinesis. Pour plus d'informations, consultez Using Kinesis Data Streams with Interface VPC Endpoints.

  • Lorsque vous spécifiez Amazon Kinesis Data Streams dans un autre compte, vous devez configurer les rôles et les stratégies pour autoriser l'accès intercompte. Pour de plus amples informations, veuillez consulter la rubrique Exemple : Lire à partir d'un flux Kinesis dans un autre compte.

Pour plus d'informations sur les prérequis de la tâche ETL de streaming, consultez Tâches ETL en streaming dans AWS Glue.

Exemple : lecture à partir de flux Kinesis

Exemple : lecture à partir de flux Kinesis

Utilisez conjointement avec forEachBatch.

Exemple pour la source de streaming Amazon Kinesis :

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Exemple : écriture dans Kinesis Streams

Exemple : lecture à partir de flux Kinesis

Utilisez conjointement avec forEachBatch.

Exemple pour la source de streaming Amazon Kinesis :

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Référence des options de connexion Kinesis

Désigne des options de connexion à Amazon Kinesis Data Streams.

Utilisez les options de connexion suivantes pour les sources de données en streaming Kinesis :

  • "streamARN" (Obligatoire) Utilisé pour la lecture/l'écriture. ARN du flux de données Kinesis.

  • "classification" (Obligatoire pour la lecture) Utilisé pour la lecture. Format de fichier utilisé par les données de l'enregistrement. Obligatoire, sauf s’il est fourni par le catalogue de données.

  • "streamName" : (facultatif) utilisé pour la lecture. Nom du flux de données Kinesis à partir duquel lire. Utilisé avec endpointUrl.

  • "endpointUrl" : (facultatif) utilisé pour la lecture. Par défaut : "https://kinesis.us-east-1.amazonaws.com". AWS Point de terminaison du flux Kinesis. Vous n'avez pas besoin de modifier ce paramètre, sauf si vous vous connectez à une région spéciale.

  • "partitionKey" : (facultatif) utilisé pour l'écriture. Clé de partition Kinesis utilisée lors de la production d'enregistrements.

  • "delimiter" : (facultatif) utilisé pour la lecture. Séparateur de valeurs utilisé lorsque classification est CSV. La valeur par défaut est « , ».

  • "startingPosition" : (facultatif) utilisé pour la lecture. La position de départ dans le flux de données Kinesis à partir duquel lire les données. Les valeurs possibles sont "latest", "trim_horizon", "earliest", ou une chaîne d'horodatage au format UTC dans le modèle yyyy-mm-ddTHH:MM:SSZ (où Z représente un décalage de fuseau horaire UTC avec un +/-. Par exemple : « 2023-04-04T08:00:00-04:00 »). La valeur par défaut est "latest". Remarque : la chaîne d'horodatage au format UTC pour n'"startingPosition"est prise en charge que pour la version 4.0 ou ultérieure de AWS Glue.

  • "failOnDataLoss" : (facultatif) échec de la tâche si une partition active est manquante ou a expiré. La valeur par défaut est "false".

  • "awsSTSRoleARN" : (facultatif) utilisé pour la lecture/l'écriture. Le nom de ressource Amazon (ARN) du rôle à assumer en utilisant AWS Security Token Service (AWS STS). Ce rôle doit disposer des autorisations nécessaires pour décrire ou lire des registres pour le flux de données Kinesis. Vous devez utiliser ce paramètre lorsque vous accédez à un flux de données dans un autre compte. Utilisez conjointement avec "awsSTSSessionName".

  • "awsSTSSessionName" : (facultatif) utilisé pour la lecture/l'écriture. Un identifiant de la séance assumant le rôle à l'aide d' AWS STS. Vous devez utiliser ce paramètre lorsque vous accédez à un flux de données dans un autre compte. Utilisez conjointement avec "awsSTSRoleARN".

  • "awsSTSEndpoint": (Facultatif) Le AWS STS point de terminaison à utiliser lors de la connexion à Kinesis avec un rôle assumé. Cela permet d'utiliser le point de AWS STS terminaison régional dans un VPC, ce qui n'est pas possible avec le point de terminaison global par défaut.

  • "maxFetchTimeInMs" : (facultatif) utilisé pour la lecture. Durée maximale pendant laquelle l'exécuteur de tâches lit les enregistrements du lot en cours à partir du flux de données Kinesis, spécifiée en millisecondes (ms). Plusieurs appels GetRecords d'API peuvent être effectués pendant cette période. La valeur par défaut est 1000.

  • "maxFetchRecordsPerShard" : (facultatif) utilisé pour la lecture. Le nombre maximum d'enregistrements à récupérer par partition dans le flux de données Kinesis par microbatch. Remarque : le client peut dépasser cette limite si la tâche de streaming a déjà lu des enregistrements supplémentaires provenant de Kinesis (lors du même appel get-records). Si elle maxFetchRecordsPerShard doit être stricte, elle doit être un multiple demaxRecordPerRead. La valeur par défaut est 100000.

  • "maxRecordPerRead" : (facultatif) utilisé pour la lecture. Nombre maximal d'enregistrements à extraire du flux de données Kinesis dans chaque opération getRecords. La valeur par défaut est 10000.

  • "addIdleTimeBetweenReads" : (facultatif) utilisé pour la lecture. Ajoute un délai entre deux opérations getRecords consécutives. La valeur par défaut est "False". Cette option n’est configurable que pour Glue version 2.0 et ultérieure.

  • "idleTimeBetweenReadsInMs" : (facultatif) utilisé pour la lecture. Délai minimum entre deux opérations getRecords, en ms. La valeur par défaut est 1000. Cette option n’est configurable que pour Glue version 2.0 et ultérieure.

  • "describeShardInterval" : (facultatif) utilisé pour la lecture. Intervalle de temps minimum entre deux appels d'API ListShards pour que votre script envisage le repartitionnement. Pour plus d'informations, consultez Politiques de repartitionnement dans le Guide du développeur Amazon Kinesis Data Streams. La valeur par défaut est 1s.

  • "numRetries" : (facultatif) utilisé pour la lecture. Le nombre maximal de nouvelles tentatives pour les demandes d'API Kinesis Data Streams. La valeur par défaut est 3.

  • "retryIntervalMs" : (facultatif) utilisé pour la lecture. Le délai de réflexion (spécifié en ms) avant de réessayer l'appel d'API Kinesis Data Streams. La valeur par défaut est 1000.

  • "maxRetryIntervalMs" : (facultatif) utilisé pour la lecture. Le délai d'attente maximal (spécifié en ms) entre deux tentatives d'appel d'API Kinesis Data Streams. La valeur par défaut est 10000.

  • "avoidEmptyBatches" : (facultatif) utilisé pour la lecture. Évite de créer une tâche de micro-lot vide en vérifiant les données non lues dans le flux de données Kinesis avant le démarrage du lot. La valeur par défaut est "False".

  • "schema" : (obligatoire lorsque inferSchema est défini sur false) utilisé pour la lecture. Schéma à utiliser pour traiter la charge utile. Si la classification est avro, le schéma fourni doit être au format de schéma Avro. Si la classification n'est pas avro, le schéma fourni doit être au format de schéma DDL.

    Voici quelques exemples de schémas.

    Example in DDL schema format
    `column1` INT, `column2` STRING , `column3` FLOAT
    Example in Avro schema format
    { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
  • "inferSchema" : (facultatif) utilisé pour la lecture. La valeur par défaut est « false ». S’il est défini sur « true », le schéma sera détecté lors de l’exécution à partir de la charge utile dans foreachbatch.

  • "avroSchema" : (obsolète) utilisé pour la lecture. Paramètre utilisé pour spécifier un schéma de données Avro lorsque le format Avro est utilisé. Ce paramètre est désormais obsolète. Utilisez le paramètre schema.

  • "addRecordTimestamp" : (facultatif) utilisé pour la lecture. Lorsque cette option est définie sur « true », la sortie de données contient une colonne supplémentaire nommée « __src_timestamp » qui indique l'heure à laquelle l'enregistrement correspondant est reçu par le flux. La valeur par défaut est « false ». Cette option est prise en charge dans AWS Glue version 4.0 ou ultérieure.

  • "emitConsumerLagMetrics" : (facultatif) utilisé pour la lecture. Lorsque l'option est définie sur « vrai », pour chaque lot, elle émet les métriques correspondant à la durée comprise entre le plus ancien enregistrement reçu par le flux et l'heure AWS Glue à laquelle il arrive CloudWatch. Le nom de la métrique est « glue.driver.streaming ». maxConsumerLagInMs». La valeur par défaut est « false ». Cette option est prise en charge dans AWS Glue version 4.0 ou ultérieure.

  • "fanoutConsumerARN" : (facultatif) utilisé pour la lecture. ARN d'un consommateur de flux Kinesis pour le flux spécifié dans streamARN. Utilisé pour activer le mode diffusion améliorée pour votre connexion Kinesis. Pour plus d'informations sur la consommation d'un flux Kinesis avec diffusion améliorée, consultez Utilisation de la diffusion améliorée dans les tâches de streaming Kinesis.

  • "recordMaxBufferedTime" : (facultatif) utilisé pour l'écriture. Par défaut : 1 000 (ms). Durée maximale pendant laquelle un enregistrement est mis en mémoire tampon en attendant d'être écrit.

  • "aggregationEnabled" : (facultatif) utilisé pour l'écriture. Valeur par défaut : vraie. Spécifie si les enregistrements doivent être agrégés avant de les envoyer à Kinesis.

  • "aggregationMaxSize" : (facultatif) utilisé pour l'écriture. Par défaut : 51 200 (octets). Si un enregistrement est supérieur à cette limite, il contourne l'agrégateur. Remarque Kinesis impose une limite de 50 Ko à la taille des enregistrements. Si vous définissez ce paramètre au-delà de 50 Ko, les enregistrements surdimensionnés seront rejetés par Kinesis.

  • "aggregationMaxCount" : (facultatif) utilisé pour l'écriture. Par défaut : 4294967295. Nombre maximum de résultats à regrouper dans un enregistrement agrégé.

  • "producerRateLimit" : (facultatif) utilisé pour l'écriture. Par défaut : 150 (%). Limite le débit par partition envoyée par un seul producteur (votre tâche, par exemple), sous forme de pourcentage de la limite du backend.

  • "collectionMaxCount" : (facultatif) utilisé pour l'écriture. Par défaut : 500. Nombre maximum d'articles à inclure dans une PutRecords demande.

  • "collectionMaxSize" : (facultatif) utilisé pour l'écriture. Par défaut : 5242880 (octets). Quantité maximale de données à envoyer avec une PutRecords demande.