Ikhtisar fitur pipeline di Amazon OpenSearch Ingestion - OpenSearch Layanan Amazon

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Ikhtisar fitur pipeline di Amazon OpenSearch Ingestion

Amazon OpenSearch Ingestion menyediakan saluran pipa, yang terdiri dari sumber, buffer, nol atau lebih prosesor, dan satu atau lebih sink. Saluran pipa konsumsi ditenagai oleh Data Prepper sebagai mesin data. Untuk ikhtisar berbagai komponen pipa, lihatKonsep utama.

Bagian berikut memberikan gambaran umum tentang beberapa fitur yang paling umum digunakan di Amazon OpenSearch Ingestion.

catatan

Ini bukan daftar lengkap fitur yang tersedia untuk saluran pipa. Untuk dokumentasi komprehensif dari semua fungsi pipeline yang tersedia, lihat dokumentasi Data Prepper. Perhatikan bahwa OpenSearch Ingestion menempatkan beberapa kendala pada plugin dan opsi yang dapat Anda gunakan. Untuk informasi selengkapnya, lihat Plugin dan opsi yang didukung untuk saluran Amazon OpenSearch Ingestion.

Buffering persisten

Buffer persisten menyimpan data Anda dalam buffer berbasis disk di beberapa Availability Zone untuk menambah daya tahan pada data Anda. Anda dapat menggunakan buffering persisten untuk menyerap data untuk semua sumber berbasis push yang didukung tanpa perlu menyiapkan buffer mandiri. Ini termasuk HTTP dan OpenTelemetry sumber untuk log, jejak, dan metrik.

Untuk mengaktifkan buffering persisten, pilih Aktifkan buffer persisten saat membuat atau memperbarui pipeline. Untuk informasi lebih lanjut, lihatMembuat saluran pipa Amazon OpenSearch Ingestion. OpenSearch Ingestion secara otomatis menentukan kapasitas buffering yang diperlukan berdasarkan Unit OpenSearch Komputasi Tertelan (OCU Ingestion) yang Anda tentukan untuk pipeline.

Secara default, pipeline menggunakan file Kunci milik AWS untuk mengenkripsi data buffer. Pipeline ini tidak memerlukan izin tambahan untuk peran pipeline. Sebagai alternatif, Anda dapat menentukan kunci terkelola pelanggan dan menambahkan izin IAM berikut ke peran pipeline:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "KeyAccess", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKeyWithoutPlaintext" ], "Resource": "arn:aws:kms:{region}:{aws-account-id}:key/1234abcd-12ab-34cd-56ef-1234567890ab" } ] }

Untuk informasi selengkapnya, lihat Kunci terkelola pelanggan di Panduan AWS Key Management Service Pengembang.

catatan

Jika Anda menonaktifkan buffering persisten, pipeline Anda akan diperbarui untuk berjalan sepenuhnya pada buffering dalam memori.

Menyetel ukuran payload permintaan maksimum

Jika Anda mengaktifkan buffering persisten untuk pipeline, ukuran payload permintaan maksimum default menjadi 1 MB. Nilai default menawarkan kinerja terbaik. Namun, Anda dapat meningkatkan nilai ini jika klien Anda mengirim permintaan yang melebihi 1 MB. Untuk menyetel ukuran muatan maksimum, atur max_request_length opsi dalam konfigurasi sumber. Sama seperti buffering persisten, opsi ini hanya didukung untuk HTTP dan OpenTelemetry sumber untuk log, jejak, dan metrik.

Satu-satunya nilai yang valid untuk max_request_length opsi ini adalah 1mb, 1.5mb, 2mb, 2.5mb, 3mb, 3.5mb, dan 4mb. Jika Anda menentukan nilai yang berbeda, Anda menerima kesalahan.

Contoh berikut menunjukkan cara mengkonfigurasi ukuran payload maksimum dalam konfigurasi pipeline:

... log-pipeline: source: http: path: "/${pipelineName}/logs" max_request_length: 4mb processor: ...

Jika Anda tidak mengaktifkan buffering persisten untuk pipeline, nilai max_request_length opsi default menjadi 10 MB untuk semua sumber dan tidak dapat dimodifikasi.

Memisahkan

Anda dapat mengonfigurasi pipeline OpenSearch Ingestion untuk membagi peristiwa masuk menjadi sub-pipeline, memungkinkan Anda melakukan berbagai jenis pemrosesan pada acara masuk yang sama.

Contoh pipeline berikut membagi peristiwa yang masuk menjadi dua sub-pipeline. Setiap sub-pipeline menggunakan prosesornya sendiri untuk memperkaya dan memanipulasi data, dan kemudian mengirimkan data ke indeks yang berbeda. OpenSearch

version: "2" log-pipeline: source: http: ... sink: - pipeline: name: "logs_enriched_one_pipeline" - pipeline: name: "logs_enriched_two_pipeline" logs_enriched_one_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_one_logs" logs_enriched_two_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_two_logs"

Mengikat

Anda dapat menghubungkan beberapa sub-pipeline bersama-sama untuk melakukan pemrosesan dan pengayaan data dalam potongan. Dengan kata lain, Anda dapat memperkaya acara yang masuk dengan kemampuan pemrosesan tertentu dalam satu sub-pipa, kemudian mengirimkannya ke sub-pipa lain untuk pengayaan tambahan dengan prosesor yang berbeda, dan akhirnya mengirimkannya ke wastafelnya. OpenSearch

Dalam contoh berikut, log_pipeline sub-pipeline memperkaya peristiwa log masuk dengan satu set prosesor, kemudian mengirimkan acara ke indeks bernama. OpenSearch enriched_logs Pipeline mengirimkan peristiwa yang sama ke log_advanced_pipeline sub-pipeline, yang memprosesnya dan mengirimkannya ke OpenSearch indeks yang berbeda bernamaenriched_advanced_logs.

version: "2" log-pipeline: source: http: ... processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_logs" - pipeline: name: "log_advanced_pipeline" log_advanced_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_advanced_logs"

Antrean surat mati

Antrian surat mati (DLQ) adalah tujuan untuk peristiwa yang gagal ditulis oleh pipa ke wastafel. Di OpenSearch Ingestion, Anda harus menentukan bucket Amazon S3 dengan izin tulis yang sesuai untuk digunakan sebagai DLQ. Anda dapat menambahkan konfigurasi DLQ ke setiap wastafel di dalam pipa. Saat pipeline menemukan kesalahan penulisan, ia membuat objek DLQ di bucket S3 yang dikonfigurasi. Objek DLQ ada dalam file JSON sebagai array peristiwa gagal.

Pipeline menulis peristiwa ke DLQ ketika salah satu dari kondisi berikut terpenuhi:

  • max_retriesUntuk OpenSearch wastafel sudah habis. OpenSearch Tertelan membutuhkan minimal 16 untuk opsi ini.

  • Peristiwa ditolak oleh wastafel karena kondisi kesalahan.

Konfigurasi

Untuk mengonfigurasi antrian huruf mati untuk sub-pipeline, tentukan dlq opsi dalam konfigurasi wastafel: opensearch

apache-log-pipeline: ... sink: opensearch: dlq: s3: bucket: "my-dlq-bucket" key_path_prefix: "dlq-files" region: "us-west-2" sts_role_arn: "arn:aws:iam::123456789012:role/dlq-role"

File yang ditulis ke DLQ S3 ini akan memiliki pola penamaan berikut:

dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}

Untuk informasi selengkapnya, lihat Dead-Letter Queues (DLQ).

Untuk instruksi untuk mengonfigurasi sts_role_arn peran, lihatMenulis ke antrian surat mati.

Contoh

Perhatikan contoh file DLQ berikut:

dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343

Berikut adalah contoh data yang gagal ditulis ke wastafel, dan dikirim ke bucket DLQ S3 untuk analisis lebih lanjut:

Record_0 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "sample log" timestamp "2023-04-14T10:36:01.070Z" Record_1 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "another sample log" timestamp "2023-04-14T10:36:01.071Z"

Manajemen indeks

Amazon OpenSearch Ingestion memiliki banyak kemampuan manajemen indeks, termasuk yang berikut ini.

Membuat indeks

Anda dapat menentukan nama indeks di wastafel pipa dan OpenSearch Ingestion membuat indeks saat menyediakan pipeline. Jika indeks sudah ada, pipeline menggunakannya untuk mengindeks peristiwa yang masuk. Jika Anda menghentikan dan memulai ulang pipeline, atau jika Anda memperbarui konfigurasi YAML-nya, pipeline akan mencoba membuat indeks baru jika belum ada. Pipeline tidak akan pernah bisa menghapus indeks.

Contoh berikut sink membuat dua indeks saat pipeline disediakan:

sink: - opensearch: index: apache_logs - opensearch: index: nginx_logs

Menghasilkan nama dan pola indeks

Anda dapat menghasilkan nama indeks dinamis dengan menggunakan variabel dari bidang peristiwa yang masuk. Dalam konfigurasi sink, gunakan format string${} untuk memberi sinyal interpolasi string, dan gunakan penunjuk JSON untuk mengekstrak bidang dari peristiwa. Pilihan untuk index_type adalah custom ataumanagement_disabled. Karena index_type default untuk OpenSearch domain dan custom management_disabled untuk koleksi OpenSearch Tanpa Server, itu dapat dibiarkan tidak disetel.

Misalnya, pipeline berikut memilih metadataType bidang dari peristiwa yang masuk untuk menghasilkan nama indeks.

pipeline: ... sink: opensearch: index: "metadata-${metadataType}"

Konfigurasi berikut terus menghasilkan indeks baru setiap hari atau setiap jam.

pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd}" pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"

Nama indeks juga bisa berupa string biasa dengan pola tanggal-waktu sebagai akhiran, seperti. my-index-%{yyyy.MM.dd} Ketika sink mengirim data ke OpenSearch, itu menggantikan pola tanggal-waktu dengan waktu UTC dan membuat indeks baru untuk setiap hari, seperti. my-index-2022.01.25 Untuk informasi lebih lanjut, lihat DateTimeFormatterkelas.

Nama indeks ini juga dapat berupa string yang diformat (dengan atau tanpa akhiran pola tanggal-waktu), seperti. my-${index}-name Ketika wastafel mengirim data ke OpenSearch, itu menggantikan "${index}" bagian dengan nilai dalam acara yang sedang diproses. Jika formatnya"${index1/index2/index3}", itu menggantikan bidang index1/index2/index3 dengan nilainya dalam acara tersebut.

Menghasilkan ID dokumen

Pipeline dapat menghasilkan ID dokumen saat mengindeks dokumen ke OpenSearch. Ini dapat menyimpulkan ID dokumen ini dari bidang dalam peristiwa yang masuk.

Contoh ini menggunakan uuid bidang dari peristiwa yang masuk untuk menghasilkan ID dokumen.

pipeline: ... sink: opensearch: index_type: custom index: "metadata-${metadataType}-%{yyyy.MM.dd}" document_id_field: "uuid"

Dalam contoh berikut, prosesor Tambah entri menggabungkan bidang uuid dan other_field dari peristiwa yang masuk untuk menghasilkan ID dokumen.

createTindakan ini memastikan bahwa dokumen dengan ID identik tidak ditimpa. Pipeline menjatuhkan dokumen duplikat tanpa percobaan ulang atau acara DLQ. Ini adalah harapan yang masuk akal bagi penulis pipeline yang menggunakan tindakan ini, karena tujuannya adalah untuk menghindari memperbarui dokumen yang ada.

pipeline: ... processor: - add_entries: entries: - key: "my_doc_id_field" format: "${uuid}-${other_field}" sink: - opensearch: ... action: "create" document_id_field: "my_doc_id_field"

Anda mungkin ingin menyetel ID dokumen acara ke bidang dari sub-objek. Dalam contoh berikut, plugin OpenSearch sink menggunakan sub-objek info/id untuk menghasilkan ID dokumen.

sink: - opensearch: ... document_id_field: info/id

Mengingat peristiwa berikut, pipeline akan menghasilkan dokumen dengan _id bidang yang disetel kejson001:

{ "fieldA":"arbitrary value", "info":{ "id":"json001", "fieldA":"xyz", "fieldB":"def" } }

Menghasilkan ID perutean

Anda dapat menggunakan routing_field opsi dalam plugin OpenSearch sink untuk mengatur nilai properti routing dokumen (_routing) ke nilai dari peristiwa yang masuk.

Routing mendukung sintaks pointer JSON, sehingga bidang bersarang juga tersedia, bukan hanya bidang tingkat atas.

sink: - opensearch: ... routing_field: metadata/id document_id_field: id

Mengingat peristiwa berikut, plugin menghasilkan dokumen dengan _routing bidang diatur keabcd:

{ "id":"123", "metadata":{ "id":"abcd", "fieldA":"valueA" }, "fieldB":"valueB" }

Untuk petunjuk membuat templat indeks yang dapat digunakan pipeline selama pembuatan indeks, lihat Templat indeks.

E nd-to-end pengakuan

OpenSearch Penyerapan memastikan daya tahan dan keandalan data dengan melacak pengirimannya dari sumber ke bak cuci di jaringan pipa tanpa kewarganegaraan menggunakan pengakuan. end-to-end Saat ini, hanya plugin sumber S3 yang mendukung end-to-end pengakuan.

Dengan end-to-end pengakuan, plugin sumber pipeline membuat set pengakuan untuk memantau sekumpulan peristiwa. Ini menerima pengakuan positif ketika peristiwa itu berhasil dikirim ke wastafel mereka, atau pengakuan negatif ketika salah satu peristiwa tidak dapat dikirim ke wastafel mereka.

Jika terjadi kegagalan atau kerusakan komponen pipa, atau jika sumber gagal menerima pengakuan, sumber akan habis waktu dan mengambil tindakan yang diperlukan seperti mencoba kembali atau mencatat kegagalan. Jika pipeline memiliki beberapa sink atau beberapa sub-pipeline yang dikonfigurasi, pengakuan tingkat peristiwa dikirim hanya setelah acara dikirim ke semua sink di semua sub-pipeline. Jika wastafel memiliki DLQ yang dikonfigurasi, end-to-end pengakuan juga melacak peristiwa yang ditulis ke DLQ.

Untuk mengaktifkan end-to-end pengakuan, sertakan acknowledgments opsi dalam konfigurasi sumber:

s3-pipeline: source: s3: acknowledgments: true ...

Sumber tekanan balik

Pipa dapat mengalami tekanan balik saat sibuk memproses data, atau jika sink sementara turun atau lambat untuk menelan data. OpenSearch Ingestion memiliki cara yang berbeda untuk menangani tekanan balik tergantung pada plugin sumber yang digunakan pipa.

Sumber HTTP

Saluran pipa yang menggunakan plugin sumber HTTP menangani tekanan balik secara berbeda tergantung pada komponen pipa mana yang padat:

  • Buffer — Ketika buffer penuh, pipeline mulai mengembalikan status HTTP REQUEST_TIMEOUT dengan kode kesalahan 408 kembali ke titik akhir sumber. Saat buffer dibebaskan, pipeline mulai memproses peristiwa HTTP lagi.

  • Thread sumber — Ketika semua thread sumber HTTP sibuk mengeksekusi permintaan dan ukuran antrian permintaan yang belum diproses telah melebihi jumlah maksimum permintaan yang diizinkan, pipeline mulai mengembalikan status HTTP TOO_MANY_REQUESTS dengan kode kesalahan 429 kembali ke titik akhir sumber. Ketika antrian permintaan turun di bawah ukuran antrian maksimum yang diizinkan, pipeline mulai memproses permintaan lagi.

Sumber oTel

Ketika buffer penuh untuk pipeline yang menggunakan OpenTelemetry sumber (log OTel, metrik OTel, dan jejak OTel), pipeline mulai mengembalikan status HTTP REQUEST_TIMEOUT dengan kode kesalahan 408 ke titik akhir sumber. Saat buffer dibebaskan, pipeline mulai memproses peristiwa lagi.

Sumber S3

Ketika buffer penuh untuk saluran pipa dengan sumber S3, saluran pipa berhenti memproses pemberitahuan SQS. Saat buffer dibebaskan, saluran pipa mulai memproses notifikasi lagi.

Jika sink mati atau tidak dapat menyerap data dan end-to-end pengakuan diaktifkan untuk sumber, pipeline berhenti memproses notifikasi SQS hingga menerima pengakuan yang berhasil dari semua sink.