Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Menggunakan pipeline OpenSearch Ingestion dengan Amazon Kinesis Data Streams
Gunakan pipeline OpenSearch Ingestion dengan Amazon Kinesis Data Streams untuk menyerap data rekaman streaming dari beberapa aliran ke domain dan koleksi Layanan Amazon. OpenSearch Pipa OpenSearch Ingestion menggabungkan infrastruktur konsumsi streaming untuk menyediakan cara latensi rendah skala tinggi untuk terus menelan catatan aliran dari Kinesis.
Topik
Amazon Kinesis Data Streams sebagai sumber
Dengan prosedur berikut, Anda akan mempelajari cara menyiapkan pipeline OpenSearch Ingestion yang menggunakan Amazon Kinesis Data Streams sebagai sumber data. Bagian ini mencakup prasyarat yang diperlukan, seperti membuat domain OpenSearch Layanan atau Koleksi OpenSearch Tanpa Server, dan menelusuri langkah-langkah untuk mengonfigurasi peran pipeline dan membuat pipeline.
Prasyarat
Untuk menyiapkan pipeline, Anda memerlukan satu atau beberapa Kinesis Data Streams yang aktif. Aliran ini harus menerima catatan atau siap menerima catatan dari sumber lain. Untuk informasi lebih lanjut, lihat Ikhtisar OpenSearch Tertelan.
Untuk mengatur pipeline Anda
-
Membuat domain OpenSearch Layanan atau koleksi OpenSearch Tanpa Server
Untuk membuat domain atau koleksi, lihat Memulai dengan OpenSearch Ingestion.
Untuk membuat peran IAM dengan izin yang benar untuk mengakses data tulis ke koleksi atau domain, lihat Kebijakan berbasis sumber daya.
-
Konfigurasikan peran pipeline dengan izin
Siapkan peran pipeline yang ingin Anda gunakan dalam konfigurasi pipeline dan tambahkan izin berikut ke dalamnya. Ganti
placeholder values
dengan informasi Anda sendiri.Jika enkripsi sisi server diaktifkan pada aliran, AWS KMS kebijakan berikut memungkinkan untuk mendekripsi catatan. Ganti
placeholder values
dengan informasi Anda sendiri.Agar pipeline dapat menulis data ke domain, domain harus memiliki kebijakan akses tingkat domain yang memungkinkan peran pipeline sts_role_arn untuk mengaksesnya.
Contoh berikut adalah kebijakan akses domain yang memungkinkan peran pipeline yang dibuat pada langkah sebelumnya (
pipeline-role
), untuk menulis data keingestion-domain
domain. Gantiplaceholder values
dengan informasi Anda sendiri.{ "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::
your-account-id
:role/pipeline-role
" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:Wilayah AWS
:account-id
:domain/domain-name
/*" } ] } -
Buat pipa
Konfigurasikan pipeline OpenSearch Ingestion yang menentukan K inesis-data-streams sebagai sumbernya. Anda dapat menemukan cetak biru siap pakai yang tersedia di Konsol OpenSearch Ingestion untuk membuat pipeline semacam itu. (Opsional) Untuk membuat pipeline menggunakan AWS CLI, Anda dapat menggunakan cetak biru bernama "”.
AWS-KinesisDataStreamsPipeline
Gantiplaceholder values
dengan informasi Anda sendiri.version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: # Based on whether kinesis records are aggregated or not, you could choose json, newline or ndjson codec for processing the records. # JSON codec supports parsing nested CloudWatch Events into individual log entries that will be written as documents into OpenSearch. # json: # key_name: "logEvents" # These keys contain the metadata sent by CloudWatch Subscription Filters # in addition to the individual log events: # include_keys: [ 'owner', 'logGroup', 'logStream' ] newline: streams: - stream_name: "
stream name
" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" - stream_name: "stream name
" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" # Compression will always be gzip for CloudWatch, but will vary for other sources: # compression: "gzip" # buffer_timeout: "1s" # records_to_accumulate: 100 # Change the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Amazon Kinesis Data Streams. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" # Provide the Wilayah AWS of the Data Stream. region: "us-east-1
" sink: - opensearch: # Provide an Amazon OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com
" ] index: "index_${getMetadata(\"stream_name\")}" # Ensure adding unique document id as a combination of the metadata attributes available. document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" # Provide the Wilayah AWS of the domain. region: "us-east-1
" # Enable the 'serverless' flag if the sink is an Amazon OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name: "network-policy-name" # Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an S3 bucket bucket: "your-dlq-bucket-name
" # Provide a key path prefix for the failed requests # key_path_prefix: "kinesis-pipeline/logs/dlq" # Provide the region of the bucket. region: "us-east-1
" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
"Opsi konfigurasi
Untuk opsi konfigurasi Kinesis, lihat Opsi konfigurasi
dalam dokumentasi. OpenSearch Atribut metadata yang tersedia
-
stream_name — Nama Kinesis Data Streams dari tempat rekaman telah dicerna
-
partition_key — Kunci partisi dari catatan Kinesis Data Streams yang sedang dicerna
-
sequence_number — Nomor urutan catatan Kinesis Data Streams yang sedang dicerna
-
sub_sequence_number — Sub nomor urutan catatan Kinesis Data Streams yang sedang dicerna
-
-
(Opsional) Konfigurasikan unit komputasi yang direkomendasikan (OCUs) untuk pipeline Kinesis Data Streams
Pipeline sumber OpenSearch Kinesis Data Streams juga dapat dikonfigurasi untuk menyerap rekaman aliran dari lebih dari satu aliran. Untuk konsumsi yang lebih cepat, kami sarankan Anda menambahkan unit komputasi tambahan per aliran baru yang ditambahkan.
Konsistensi data
OpenSearch Ingestion mendukung end-to-end pengakuan untuk memastikan daya tahan data. Ketika pipa membaca catatan aliran dari Kinesis, ia secara dinamis mendistribusikan pekerjaan membaca catatan aliran berdasarkan pecahan yang terkait dengan aliran. Pipeline akan secara otomatis memeriksa aliran pos ketika menerima pengakuan setelah menelan semua catatan dalam domain atau koleksi. OpenSearch Ini akan menghindari pemrosesan duplikat catatan aliran.
Untuk membuat indeks berdasarkan nama aliran, tentukan indeks di bagian sink opensearch sebagai “index_$ {getMetadata (\" stream_name\”)}”.
Amazon Kinesis Data Streams lintas akun sebagai sumber
Anda dapat memberikan akses di seluruh akun dengan Amazon Kinesis Data Streams OpenSearch sehingga pipeline Ingestion dapat mengakses Kinesis Data Streams di akun lain sebagai sumber. Selesaikan langkah-langkah berikut untuk mengaktifkan akses lintas akun:
Konfigurasikan akses lintas akun
-
Tetapkan kebijakan sumber daya di akun yang memiliki aliran Kinesis
Ganti
placeholder values
dengan informasi Anda sendiri. -
(Opsional) Pengaturan Kebijakan Sumber Daya Konsumen dan Konsumen
Ini adalah langkah opsional dan hanya akan diperlukan jika Anda berencana untuk menggunakan strategi Konsumen Fanout yang Ditingkatkan untuk membaca catatan aliran. Untuk informasi selengkapnya, lihat Mengembangkan konsumen fan-out yang disempurnakan dengan throughput khusus.
-
Pengaturan konsumen
Untuk menggunakan kembali konsumen yang sudah ada, Anda dapat melewati langkah ini. Untuk informasi selengkapnya, lihat RegisterStreamConsumerdi Referensi API Amazon Kinesis Data Streams.
Dalam contoh perintah CLI berikut, ganti
placeholder values
dengan informasi Anda sendiri.contoh : Contoh perintah CLI
aws kinesis register-stream-consumer \ --stream-arn "arn:aws:kinesis:
Wilayah AWS
:account-id
:stream/stream-name
" \ --consumer-nameconsumer-name
-
Pengaturan Kebijakan Sumber Daya Konsumen
Dalam pernyataan berikut, ganti
placeholder values
dengan informasi Anda sendiri.
-
-
Konfigurasi Pipa
Untuk konsumsi lintas akun, tambahkan atribut berikut di bawah
kinesis_data_streams
untuk setiap aliran:-
stream_arn
- arn aliran milik akun tempat aliran ada -
consumer_arn
- ini adalah atribut opsional dan harus ditentukan jika strategi konsumen fanout default yang ditingkatkan dipilih. Tentukan arn konsumen aktual untuk bidang ini. Gantiplaceholder values
dengan informasi Anda sendiri.
version: "2" kinesis-pipeline: source: kinesis_data_streams: acknowledgments: true codec: newline: streams: - stream_arn: "arn:aws:kinesis:
region
:stream-account-id
:stream/stream-name
" consumer_arn: "consumer arn
" # Enable this if ingestion should start from the start of the stream. # initial_position: "EARLIEST" # checkpoint_interval: "PT5M" - stream_arn: "arn:aws:kinesis:region
:stream-account-id
:stream/stream-name
" consumer_arn: "consumer arn
" # initial_position: "EARLIEST" # buffer_timeout: "1s" # records_to_accumulate: 100 # Enable the consumer strategy to "polling". Default consumer strategy will use enhanced "fan-out" supported by KDS. # consumer_strategy: "polling" # if consumer strategy is set to "polling", enable the polling config below. # polling: # max_polling_records: 100 # idle_time_between_reads: "250ms" aws: # Provide the Role ARN with access to Kinesis. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333
:role/Example-Role
" # Provide the Wilayah AWS of the domain. region: "us-east-1
" sink: - opensearch: # Provide an OpenSearch Serverless domain endpoint hosts: [ "https://search-mydomain-1a2a3a4a5a6a7a8a9a0a9a8a7a.us-east-1.es.amazonaws.com
" ] index: "index_${getMetadata(\"stream_name\")}" # Mapping for documentid based on partition key, shard sequence number and subsequence number metadata attributes document_id: "${getMetadata(\"partition_key\")}_${getMetadata(\"sequence_number\")}_${getMetadata(\"sub_sequence_number\")}" aws: # Provide a Role ARN with access to the domain. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" # Provide the Wilayah AWS of the domain. region: "us-east-1
" # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection serverless: false # serverless_options: # Specify a name here to create or update network policy for the serverless collection # network_policy_name:network-policy-name
# Enable the 'distribution_version' setting if the OpenSearch Serverless domain is of version Elasticsearch 6.x # distribution_version: "es6" # Enable and switch the 'enable_request_compression' flag if the default compression setting is changed in the domain. See https://docs.aws.amazon.com/opensearch-service/latest/developerguide/gzip.html # enable_request_compression: true/false # Optional: Enable the S3 DLQ to capture any failed requests in an S3 bucket. Delete this entire block if you don't want a DLQ. dlq: s3: # Provide an Amazon S3 bucket bucket: "your-dlq-bucket-name
" # Provide a key path prefix for the failed requests # key_path_prefix: "alb-access-log-pipeline/logs/dlq
" # Provide the Wilayah AWS of the bucket. region: "us-east-1
" # Provide a Role ARN with access to the bucket. This role should have a trust relationship with osis-pipelines.amazonaws.com sts_role_arn: "arn:aws:iam::111122223333:role/Example-Role
" -
-
Aliran Data Kinesis Peran Pipa OSI
-
Kebijakan IAM
Tambahkan kebijakan berikut ke peran pipeline. Ganti
placeholder values
dengan informasi Anda sendiri. -
Kebijakan Kepercayaan
Untuk menyerap data dari akun streaming, Anda perlu membangun hubungan kepercayaan antara peran pipeline ingestion dan akun streaming. Tambahkan yang berikut ini ke peran pipeline. Ganti
placeholder values
dengan informasi Anda sendiri.
-