Agregasi acara dengan Amazon Ingestion OpenSearch - OpenSearch Layanan Amazon

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Agregasi acara dengan Amazon Ingestion OpenSearch

Anda dapat menggunakan Amazon OpenSearch Ingestion untuk mengumpulkan data dari berbagai peristiwa selama periode waktu tertentu. Agregasi peristiwa dapat membantu mengurangi volume log yang tidak perlu dan menangani kasus penggunaan seperti log multi-baris yang masuk sebagai peristiwa terpisah. Prosesor Agregat adalah prosesor stateful yang mengelompokkan peristiwa berdasarkan nilai untuk satu set kunci identifikasi yang ditentukan, dan melakukan tindakan yang dapat dikonfigurasi pada setiap grup.

Status dalam prosesor Agregat disimpan dalam memori. Misalnya, untuk menggabungkan empat peristiwa menjadi satu, prosesor perlu mempertahankan bagian dari tiga peristiwa pertama. Status kelompok agregat peristiwa disimpan untuk jumlah waktu yang dapat dikonfigurasi. Bergantung pada log Anda, tindakan agregat yang digunakan, dan jumlah opsi memori dalam konfigurasi prosesor, agregasi dapat berlangsung dalam jangka waktu yang lama.

Selain contoh-contoh ini, Anda juga dapat menggunakan agregasi Log dengan cetak biru routing bersyarat. Untuk informasi selengkapnya tentang cetak biru, lihat Menggunakan cetak biru untuk membuat pipeline.

Penggunaan dasar

Contoh pipeline berikut mengekstrak bidangsourceIp,destinationIp, dan port menggunakan prosesor Grok, dan kemudian agregat pada bidang tersebut selama periode 30 detik menggunakan prosesor Agregat dan tindakan. put_all Pada akhir 30 detik, log agregat dikirim ke OpenSearch wastafel.

version: "2" aggregate_pipeline: source: http: path: "/${pipelineName}/logs" processor: - grok: match: log: ["%{IPORHOST:sourceIp} %{IPORHOST:destinationIp} %{NUMBER:port:int}"] - aggregate: group_duration: "30s" identification_keys: ["sourceIp", "destinationIp", "port"] action: put_all: sink: - opensearch: ... index: aggregated_logs

Misalnya, pertimbangkan kumpulan log berikut:

{ "log": "127.0.0.1 192.168.0.1 80", "status": 200 } { "log": "127.0.0.1 192.168.0.1 80", "bytes": 1000 } { "log": "127.0.0.1 192.168.0.1 80" "http_verb": "GET" }

Prosesor Grok akan mengekstrak identification_keys untuk membuat log berikut:

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "status": 200 } { "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "bytes": 1000 } { "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "http_verb": "GET" }

Ketika grup selesai 30 detik setelah log pertama diterima oleh prosesor Agregat, log agregat berikut ditulis ke wastafel:

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "port": 80, "status": 200, "bytes": 1000, "http_verb": "GET" }

Menghapus duplikat

Anda dapat menghapus entri duplikat dengan menurunkan kunci dari peristiwa yang masuk dan menentukan remove_duplicates opsi untuk prosesor Agregat. Tindakan ini segera memproses peristiwa pertama untuk grup, dan menghapus semua peristiwa berikut dalam grup itu.

Dalam contoh berikut, acara pertama diproses dengan kunci identifikasi sourceIp dandestinationIp:

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "status": 200 }

Pipeline kemudian akan menjatuhkan acara berikut karena memiliki kunci yang sama:

{ "sourceIp": "127.0.0.1", "destinationIp": "192.168.0.1", "bytes": 1000 }

Pipeline memproses acara ini dan membuat grup baru sourceIp karena berbeda:

{ "sourceIp": "127.0.0.2", "destinationIp": "192.168.0.1", "bytes": 1000 }

Agregasi log dan perutean bersyarat

Anda dapat menggunakan beberapa plugin untuk menggabungkan agregasi log dengan perutean bersyarat. Dalam contoh ini, sub-pipeline log-aggregate-pipeline menerima log melalui klien HTTP seperti FluentBit dan mengekstrak nilai-nilai penting dari log dengan mencocokkan nilai dalam log kunci terhadap pola log Apache umum.

Dua dari nilai yang diekstrak dari log dengan pola grok meliputi response danclientip. Prosesor Aggregate kemudian menggunakan clientip nilai, bersama dengan remove_duplicates opsi, untuk menjatuhkan setiap log yang berisi clientip yang telah diproses dalam yang diberikangroup_duration.

Tiga rute, atau pernyataan bersyarat, ada dalam pipa. Rute-rute ini memisahkan nilai respons menjadi respons 2xx/3xx, 4xx, dan 5xx. Log dengan status 2xx dan 3xx dikirim ke aggregated_2xx_3xx indeks, log dengan status 4xx dikirim ke aggregated_4xx indeks, dan log dengan status 5xx dikirim ke indeks. aggregated_5xx

version: "2" log-aggregate-pipeline: source: http: # Provide the path for ingestion. ${pipelineName} will be replaced with pipeline name configured for this pipeline. # In this case it would be "/log-aggregate-pipeline/logs". This will be the FluentBit output URI value. path: "/${pipelineName}/logs" processor: - grok: match: log: [ "%{COMMONAPACHELOG_DATATYPED}" ] - aggregate: identification_keys: ["clientip"] action: remove_duplicates: group_duration: "180s" route: - 2xx_status: "/response >= 200 and /response < 300" - 3xx_status: "/response >= 300 and /response < 400" - 4xx_status: "/response >= 400 and /response < 500" - 5xx_status: "/response >= 500 and /response < 600" sink: - opensearch: ... index: "aggregated_2xx_3xx" routes: - 2xx_status - 3xx_status - opensearch: ... index: "aggregated_4xx" routes: - 4xx_status - opensearch: ... index: "aggregated_5xx" routes: - 5xx_status