Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Utiliser un pipeline d' OpenSearch ingestion avec Amazon Kinesis Data Streams
Utilisez un pipeline d' OpenSearch ingestion avec Amazon Kinesis Data Streams pour ingérer les données d'enregistrements de flux provenant de plusieurs flux vers des domaines et des OpenSearch collections Amazon Service. Le pipeline OpenSearch d'ingestion intègre l'infrastructure d'ingestion de streaming afin de fournir un moyen à grande échelle et à faible latence d'ingérer en continu les enregistrements de flux provenant de Kinesis.
Rubriques
Amazon Kinesis Data Streams en tant que source
Avec la procédure suivante, vous allez apprendre à configurer un pipeline d' OpenSearch ingestion qui utilise Amazon Kinesis Data Streams comme source de données. Cette section couvre les prérequis nécessaires, tels que la création d'un domaine de OpenSearch service ou d'une collection OpenSearch sans serveur, et décrit les étapes de configuration du rôle de pipeline et de création du pipeline.
Prérequis
Pour configurer votre pipeline, vous avez besoin d'un ou de plusieurs Kinesis Data Streams actifs. Ces flux doivent recevoir des enregistrements ou être prêts à recevoir des enregistrements provenant d'autres sources. Pour plus d'informations, voir Présentation de OpenSearch l'ingestion.
Pour configurer votre pipeline
-
Création d'un domaine OpenSearch de service ou d'une OpenSearch collection sans serveur
Pour créer un domaine ou une collection, consultez Getting started with OpenSearch Ingestion.
Pour créer un rôle IAM doté des autorisations appropriées pour accéder aux données d'écriture de la collection ou du domaine, consultez la section Politiques basées sur les ressources.
-
Configurer le rôle de pipeline avec des autorisations
Configurez le rôle de pipeline que vous souhaitez utiliser dans la configuration de votre pipeline et ajoutez-y les autorisations suivantes. Remplacez
placeholder values
par vos propres informations.Si le chiffrement côté serveur est activé sur les flux, la AWS KMS politique suivante permet de déchiffrer les enregistrements. Remplacez
placeholder values
par vos propres informations.Pour qu'un pipeline puisse écrire des données dans un domaine, celui-ci doit disposer d'une politique d'accès au niveau du domaine qui autorise le rôle de pipeline sts_role_arn à y accéder.
L'exemple suivant est une politique d'accès au domaine qui permet au rôle de pipeline créé à l'étape précédente (
pipeline-role
) d'écrire des données dans leingestion-domain
domaine. Remplacezplaceholder values
par vos propres informations.{ "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::
your-account-id
:role/pipeline-role
" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:Région AWS
:account-id
:domain/domain-name
/*" } ] } -
Création du pipeline
Configurez un pipeline d' OpenSearch ingestion en spécifiant K inesis-data-streams comme source. Vous pouvez trouver un plan prêt à l'emploi disponible sur la console OpenSearch d'ingestion pour créer un tel pipeline. (Facultatif) Pour créer le pipeline à l'aide du AWS CLI, vous pouvez utiliser un plan nommé «
AWS-KinesisDataStreamsPipeline
». Remplacezplaceholder values
par vos propres informations.version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: # Based on whether kinesis records are aggregated or not, you could choose json, newline or ndjson codec for processing the records. # JSON codec supports parsing nested CloudWatch Events into individual log entries that will be written as documents into OpenSearch. # json: # key_name: "logEvents" # These keys contain the metadata sent by CloudWatch Subscription Filters # in addition to the individual log events: # include_keys: [ 'owner', 'logGroup', 'logStream' ] newline: streams: - stream_name: "
stream name
" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" - stream_name: "stream name
" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" # buffer_timeout: "1s" # records_to_accumulate: 100 # Change the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Amazon Kinesis Data Streams. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" # Provide the Région AWS of the Data Stream. region: "us-east-1
" sink: - opensearch: # Provide an Amazon OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com
" ] index: "index_${getMetadata(\"stream_name\")}" # Ensure adding unique document id as a combination of the metadata attributes available. document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" # Provide the Région AWS of the domain. region: "us-east-1
" # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: "network-policy-name" # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an S3 bucket bucket: "your-dlq-bucket-name
" # Provide a key path prefix for the failed requests # key_path_prefix: "kinesis-pipeline/logs/dlq" # Provide the region of the bucket. region: "us-east-1
" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
"Options de configuration
Pour les options de configuration de Kinesis, consultez la section Options de configuration
dans la OpenSearchdocumentation. Attributs de métadonnées disponibles
-
stream_name — Nom du Kinesis Data Streams à partir duquel l'enregistrement a été ingéré
-
partition_key — Clé de partition de l'enregistrement Kinesis Data Streams en cours d'ingestion
-
sequence_number — Numéro de séquence de l'enregistrement Kinesis Data Streams en cours d'ingestion
-
sub_sequence_number — Numéro de sous-séquence de l'enregistrement Kinesis Data Streams en cours d'ingestion
-
-
(Facultatif) Configurer les unités de calcul recommandées (OCUs) pour le pipeline Kinesis Data Streams
Un pipeline source OpenSearch Kinesis Data Streams peut également être configuré pour ingérer des enregistrements de flux provenant de plusieurs flux. Pour accélérer l'ingestion, nous vous recommandons d'ajouter une unité de calcul supplémentaire par nouveau flux ajouté.
Cohérence des données
OpenSearch L'ingestion prend en charge end-to-end la reconnaissance afin de garantir la durabilité des données. Lorsque le pipeline lit des enregistrements de flux depuis Kinesis, il répartit de manière dynamique le travail de lecture des enregistrements de flux en fonction des partitions associées aux flux. Le pipeline vérifiera automatiquement les flux lorsqu'il recevra un accusé de réception après avoir ingéré tous les enregistrements du OpenSearch domaine ou de la collection. Cela permettra d'éviter le double traitement des enregistrements de flux.
Pour créer l'index en fonction du nom du flux, définissez l'index dans la section du récepteur d'opensearch comme « index_$ {getMetadata (\" stream_name \ »)} ».
Compte croisé Amazon Kinesis Data Streams en tant que source
Vous pouvez accorder l'accès à plusieurs comptes avec Amazon Kinesis Data Streams afin OpenSearch que les pipelines d'ingestion puissent accéder à Kinesis Data Streams depuis un autre compte en tant que source. Procédez comme suit pour activer l'accès entre comptes :
Configuration de l'accès entre comptes
-
Définissez la politique en matière de ressources dans le compte qui possède le flux Kinesis
Remplacez
placeholder values
par vos propres informations. -
(Facultatif) Configurer la politique relative aux consommateurs et aux ressources destinées aux consommateurs
Cette étape est facultative et ne sera requise que si vous prévoyez d'utiliser la stratégie Enhanced Fanout Consumer pour lire les enregistrements de flux. Pour plus d'informations, voir Développer des consommateurs de fans optimisés avec un débit dédié.
-
Configurer le consommateur
Pour réutiliser un client existant, vous pouvez ignorer cette étape. Pour plus d'informations, consultez RegisterStreamConsumerle manuel Amazon Kinesis Data Streams API Reference.
Dans l'exemple de commande CLI suivant, remplacez le
placeholder values
par vos propres informations.Exemple de commande CLI :
aws kinesis register-stream-consumer \ --stream-arn "arn:aws:kinesis:
Région AWS
:account-id
:stream/stream-name
" \ --consumer-nameconsumer-name
-
Configurer une politique de ressources pour les consommateurs
Dans la déclaration suivante, remplacez le
placeholder values
par vos propres informations.
-
-
Configuration du pipeline
Pour l'ingestion entre comptes, ajoutez les attributs suivants ci-dessous
kinesis_data_streams
pour chaque flux :-
stream_arn
- l'ARN du flux appartenant au compte sur lequel le flux existe -
consumer_arn
- il s'agit d'un attribut facultatif qui doit être spécifié si la stratégie fanout consumer améliorée par défaut est choisie. Spécifiez l'ARN réel du consommateur pour ce champ. Remplacezplaceholder values
par vos propres informations.
version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: newline: streams: - stream_arn: "arn:aws:kinesis:
region
:stream-account-id
:stream/stream-name
" consumer_arn: "consumer arn
" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" - stream_arn: "arn:aws:kinesis:region
:stream-account-id
:stream/stream-name
" consumer_arn: "consumer arn
" # initial_position: "EARLIEST" # buffer_timeout: "1s" # records_to_accumulate: 100 # Enable the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Kinesis. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333
:role/Example-Role
" # Provide the Région AWS of the domain. region: "us-east-1
" sink: - opensearch: # Provide an OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com
" ] index: "index_${getMetadata(\"stream_name\")}" # Mapping for documentid based on partition key, shard sequence number and subsequence number metadata attributes document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" # Provide the Région AWS of the domain. region: "us-east-1
" # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name:network-policy-name
# Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an Amazon S3 bucket bucket: "your-dlq-bucket-name
" # Provide a key path prefix for the failed requests # key_path_prefix: "alb-access-log-pipeline/logs/dlq
" # Provide the Région AWS of the bucket. region: "us-east-1
" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" -
-
Rôle du pipeline OSI : Kinesis Data Streams
-
Stratégie IAM
Ajoutez la politique suivante au rôle de pipeline. Remplacez
placeholder values
par vos propres informations. -
Stratégie d’approbation
Pour ingérer les données du compte de flux, vous devez établir une relation de confiance entre le rôle d'ingestion du pipeline et le compte du flux. Ajoutez ce qui suit au rôle de pipeline. Remplacez
placeholder values
par vos propres informations.
-