Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Verwenden Sie eine OpenSearch Ingestion-Pipeline mit Amazon Kinesis Data Streams
Verwenden Sie eine OpenSearch Ingestion-Pipeline mit Amazon Kinesis Data Streams, um Stream-Datensatzdaten aus mehreren Streams in Amazon OpenSearch Service-Domains und -Sammlungen aufzunehmen. Die OpenSearch Ingestion-Pipeline umfasst die Streaming-Ingestion-Infrastruktur, um eine hochskalierbare Methode mit geringer Latenz für die kontinuierliche Aufnahme von Stream-Datensätzen von Kinesis bereitzustellen.
Themen
Amazon Kinesis Data Streams als Quelle
Mit dem folgenden Verfahren erfahren Sie, wie Sie eine OpenSearch Ingestion-Pipeline einrichten, die Amazon Kinesis Data Streams als Datenquelle verwendet. In diesem Abschnitt werden die erforderlichen Voraussetzungen behandelt, z. B. das Erstellen einer OpenSearch Service-Domain oder einer OpenSearch Serverless Collection. Außerdem werden die Schritte zur Konfiguration der Pipeline-Rolle und zum Erstellen der Pipeline beschrieben.
Voraussetzungen
Um Ihre Pipeline einzurichten, benötigen Sie einen oder mehrere aktive Kinesis Data Streams. Diese Streams müssen entweder Datensätze empfangen oder bereit sein, Datensätze aus anderen Quellen zu empfangen. Weitere Informationen finden Sie unter Überblick über die OpenSearch Aufnahme.
Um Ihre Pipeline einzurichten
-
Erstellen Sie eine OpenSearch Dienstdomäne oder eine OpenSearch serverlose Sammlung
Informationen zum Erstellen einer Domain oder einer Sammlung finden Sie unter Erste Schritte mit OpenSearch Ingestion.
-
Konfigurieren Sie die Pipeline-Rolle mit Berechtigungen
Richten Sie die Pipeline-Rolle ein, die Sie in Ihrer Pipeline-Konfiguration verwenden möchten, und fügen Sie ihr die folgenden Berechtigungen hinzu. Ersetzen Sie
placeholder values
durch Ihre Informationen.Wenn die serverseitige Verschlüsselung für die Streams aktiviert ist, ermöglicht die folgende AWS KMS Richtlinie das Entschlüsseln der Datensätze. Ersetzen Sie
placeholder values
durch Ihre Informationen.Damit eine Pipeline Daten in eine Domäne schreiben kann, muss die Domäne über eine Zugriffsrichtlinie auf Domänenebene verfügen, die der Pipeline-Rolle sts_role_arn den Zugriff darauf ermöglicht.
Das folgende Beispiel ist eine Domänenzugriffsrichtlinie, die es der im vorherigen Schritt (
pipeline-role
) erstellten Pipeline-Rolle ermöglicht, Daten in die Domäne zu schreiben.ingestion-domain
Ersetzen Sieplaceholder values
durch Ihre Informationen.{ "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::
your-account-id
:role/pipeline-role
" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:AWS-Region
:account-id
:domain/domain-name
/*" } ] } -
Erstellen Sie die Pipeline
Konfigurieren Sie eine OpenSearch Ingestion-Pipeline mit K inesis-data-streams als Quelle. In der OpenSearch Ingestion Console finden Sie einen vorgefertigten Blueprint zum Erstellen einer solchen Pipeline. (Optional) Um die Pipeline mithilfe von zu erstellen AWS CLI, können Sie einen Blueprint mit dem Namen "“ verwenden.
AWS-KinesisDataStreamsPipeline
Ersetzen Sieplaceholder values
durch Ihre Informationen.version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: # Based on whether kinesis records are aggregated or not, you could choose json, newline or ndjson codec for processing the records. # JSON codec supports parsing nested CloudWatch Events into individual log entries that will be written as documents into OpenSearch. # json: # key_name: "logEvents" # These keys contain the metadata sent by CloudWatch Subscription Filters # in addition to the individual log events: # include_keys: [ 'owner', 'logGroup', 'logStream' ] newline: streams: - stream_name: "
stream name
" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" - stream_name: "stream name
" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" # buffer_timeout: "1s" # records_to_accumulate: 100 # Change the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Amazon Kinesis Data Streams. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" # Provide the AWS-Region of the Data Stream. region: "us-east-1
" sink: - opensearch: # Provide an Amazon OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com
" ] index: "index_${getMetadata(\"stream_name\")}" # Ensure adding unique document id as a combination of the metadata attributes available. document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" # Provide the AWS-Region of the domain. region: "us-east-1
" # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: "network-policy-name" # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an S3 bucket bucket: "your-dlq-bucket-name
" # Provide a key path prefix for the failed requests # key_path_prefix: "kinesis-pipeline/logs/dlq" # Provide the region of the bucket. region: "us-east-1
" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
"Konfigurationsoptionen
Informationen zu den Kinesis-Konfigurationsoptionen finden Sie in der OpenSearchDokumentation unter Konfigurationsoptionen
. Verfügbare Metadatenattribute
-
stream_name — Name der Kinesis Data Streams, aus denen der Datensatz aufgenommen wurde
-
partition_key — Partitionsschlüssel des Kinesis Data Streams Streams-Datensatzes, der aufgenommen wird
-
sequence_number — Sequenznummer des Kinesis Data Streams Streams-Datensatzes, der aufgenommen wird
-
sub_sequence_number — Untersequenznummer des Kinesis Data Streams Streams-Datensatzes, der aufgenommen wird
-
-
(Optional) Empfohlene Recheneinheiten (OCUs) für die Kinesis Data Streams Streams-Pipeline konfigurieren
Eine OpenSearch Kinesis Data Streams Streams-Quellpipeline kann auch so konfiguriert werden, dass Stream-Datensätze aus mehr als einem Stream aufgenommen werden. Für eine schnellere Aufnahme empfehlen wir, für jeden neu hinzugefügten Stream eine zusätzliche Recheneinheit hinzuzufügen.
Datenkonsistenz
OpenSearch Die Aufnahme unterstützt die end-to-end Bestätigung, um die Datenbeständigkeit zu gewährleisten. Wenn die Pipeline Stream-Datensätze aus Kinesis liest, verteilt sie die Arbeit beim Lesen von Stream-Datensätzen dynamisch auf der Grundlage der mit den Streams verknüpften Shards. Die Pipeline überprüft Streams automatisch, wenn sie nach der Aufnahme aller Datensätze in der Domain oder Sammlung eine Bestätigung erhält. OpenSearch Dadurch wird eine doppelte Verarbeitung von Stream-Datensätzen vermieden.
Um den Index auf der Grundlage des Stream-Namens zu erstellen, definieren Sie den Index im Opensearch-Senkbereich als „index_$ {getMetadata (\" stream_name\“)}“.
Amazon Kinesis Data Streams kontoübergreifend als Quelle
Sie können mit Amazon Kinesis Data Streams kontenübergreifenden Zugriff gewähren, sodass OpenSearch Ingestion-Pipelines auf Kinesis Data Streams in einem anderen Konto als Quelle zugreifen können. Gehen Sie wie folgt vor, um den kontoübergreifenden Zugriff zu aktivieren:
Konfigurieren Sie den kontoübergreifenden Zugriff
-
Legen Sie die Ressourcenrichtlinie für das Konto fest, das den Kinesis-Stream hat
Ersetzen Sie
placeholder values
durch Ihre Informationen. -
(Optional) Richten Sie eine Ressourcenrichtlinie für Verbraucher und Verbraucher ein
Dieser Schritt ist optional und nur erforderlich, wenn Sie beabsichtigen, die Enhanced Fanout Consumer-Strategie zum Lesen von Stream-Datensätzen zu verwenden. Weitere Informationen finden Sie unter Entwickeln erweiterter Fanout-Consumer mit dediziertem Durchsatz.
-
Consumer einrichten
Um einen vorhandenen Verbraucher wiederzuverwenden, können Sie diesen Schritt überspringen. Weitere Informationen finden Sie RegisterStreamConsumerin der Amazon Kinesis Data Streams API-Referenz.
Ersetzen Sie im folgenden CLI-Beispielbefehl die
placeholder values
durch Ihre eigenen Informationen.Beispiel für einen CLI-Befehl:
aws kinesis register-stream-consumer \ --stream-arn "arn:aws:kinesis:
AWS-Region
:account-id
:stream/stream-name
" \ --consumer-nameconsumer-name
-
Richten Sie eine Richtlinie für Verbraucherressourcen ein
Ersetzen Sie in der folgenden Erklärung die
placeholder values
durch Ihre eigenen Informationen.
-
-
Konfiguration der Pipeline
Für die kontoübergreifende Erfassung fügen Sie unter
kinesis_data_streams
für jeden Stream die folgenden Attribute hinzu:-
stream_arn
— der ARN des Streams, der zu dem Konto gehört, in dem der Stream existiert -
consumer_arn
- Dies ist ein optionales Attribut und muss angegeben werden, wenn die standardmäßige erweiterte Fanout-Verbraucherstrategie gewählt wird. Geben Sie den tatsächlichen Consumer-ARN für dieses Feld an. Ersetzen Sieplaceholder values
durch Ihre Informationen.
version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: newline: streams: - stream_arn: "arn:aws:kinesis:
region
:stream-account-id
:stream/stream-name
" consumer_arn: "consumer arn
" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" - stream_arn: "arn:aws:kinesis:region
:stream-account-id
:stream/stream-name
" consumer_arn: "consumer arn
" # initial_position: "EARLIEST" # buffer_timeout: "1s" # records_to_accumulate: 100 # Enable the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Kinesis. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333
:role/Example-Role
" # Provide the AWS-Region of the domain. region: "us-east-1
" sink: - opensearch: # Provide an OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com
" ] index: "index_${getMetadata(\"stream_name\")}" # Mapping for documentid based on partition key, shard sequence number and subsequence number metadata attributes document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" # Provide the AWS-Region of the domain. region: "us-east-1
" # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name:network-policy-name
# Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an Amazon S3 bucket bucket: "your-dlq-bucket-name
" # Provide a key path prefix for the failed requests # key_path_prefix: "alb-access-log-pipeline/logs/dlq
" # Provide the AWS-Region of the bucket. region: "us-east-1
" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" -
-
OSI-Pipeline-Rolle: Kinesis Data Streams
-
IAM-Richtlinie
Fügen Sie der Pipeline-Rolle die folgende Richtlinie hinzu. Ersetzen Sie
placeholder values
durch Ihre Informationen. -
Vertrauensrichtlinie
Um Daten aus dem Stream-Konto aufzunehmen, müssen Sie eine Vertrauensbeziehung zwischen der Pipeline-Aufnahmerolle und dem Stream-Konto einrichten. Fügen Sie der Pipeline-Rolle Folgendes hinzu. Ersetzen Sie
placeholder values
durch Ihre Informationen.
-