Menulis ke Amazon Kinesis Data Streams menggunakan Agen Kinesis - Amazon Kinesis Data Streams

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menulis ke Amazon Kinesis Data Streams menggunakan Agen Kinesis

Kinesis Agent adalah aplikasi perangkat lunak Java yang berdiri sendiri yang menawarkan cara mudah untuk mengumpulkan dan mengirim data ke Kinesis Data Streams. Agen terus memantau satu set file dan mengirimkan data baru ke aliran Anda. Agen menangani rotasi file, checkpointing, dan coba lagi pada kegagalan. Agen memberikan semua data Anda dengan cara yang andal, tepat waktu, dan sederhana. Ini juga memancarkan CloudWatch metrik Amazon untuk membantu Anda memantau dan memecahkan masalah proses streaming dengan lebih baik.

Secara default, catatan diurai dari setiap file berdasarkan karakter baris baru ('\n'). Namun, agen juga dapat dikonfigurasi untuk mengurai catatan multi-baris (lihat Tentukan pengaturan konfigurasi agen).

Anda dapat menginstal agen di lingkungan server berbasis Linux seperti server web, server log, dan server basis data. Setelah menginstal agen, konfigurasikan dengan menentukan file yang akan dipantau dan aliran untuk data. Setelah agen dikonfigurasi, agen akan mengumpulkan data dari file dengan andal dan mengirimkannya ke aliran dengan andal.

Lengkapi prasyarat untuk Agen Kinesis

  • Sistem operasi Anda harus Amazon Linux AMI dengan versi 2015.09 atau yang lebih baru, atau Red Hat Enterprise Linux versi 7 atau yang lebih baru.

  • Jika Anda menggunakan Amazon EC2 untuk menjalankan agen Anda, luncurkan EC2 instans Anda.

  • Kelola AWS kredensyal Anda menggunakan salah satu metode berikut:

    • Tentukan IAM peran saat Anda meluncurkan EC2 instans Anda.

    • Tentukan AWS kredensil saat Anda mengonfigurasi agen (lihat awsAccessKeyId dan awsSecretAccessKunci).

    • Edit /etc/sysconfig/aws-kinesis-agent untuk menentukan wilayah dan kunci AWS akses Anda.

    • Jika EC2 instans Anda berada di AWS akun yang berbeda, buat IAM peran untuk menyediakan akses ke layanan Kinesis Data Streams, dan tentukan peran tersebut saat Anda mengonfigurasi agen assumeRoleARN(assumeRoleExternallihat dan Id). Gunakan salah satu metode sebelumnya untuk menentukan AWS kredensi pengguna di akun lain yang memiliki izin untuk mengambil peran ini.

  • IAMPeran atau AWS kredensional yang Anda tentukan harus memiliki izin untuk melakukan operasi Kinesis Data PutRecordsStreams agar agen dapat mengirim data ke aliran Anda. Jika Anda mengaktifkan CloudWatch pemantauan untuk agen, izin untuk melakukan CloudWatch PutMetricDataoperasi juga diperlukan. Untuk informasi selengkapnyaMengontrol akses ke sumber daya Amazon Kinesis Data Streams menggunakan IAM, lihatPantau kesehatan Agen Aliran Data Kinesis dengan Amazon CloudWatch, dan Kontrol CloudWatch Akses.

Unduh dan instal agen

Pertama-tama, hubungkan ke instans Anda. Untuk informasi selengkapnya, lihat Connect to Your Instance di Panduan EC2 Pengguna Amazon. Jika Anda mengalami masalah saat menyambung, lihat Pemecahan Masalah Menghubungkan ke Instans Anda di EC2Panduan Pengguna Amazon.

Untuk mengatur agen menggunakan Amazon Linux AMI

Gunakan perintah berikut untuk mengunduh dan menginstal agen:

sudo yum install –y aws-kinesis-agent
Untuk mengatur agen menggunakan Red Hat Enterprise Linux

Gunakan perintah berikut untuk mengunduh dan menginstal agen:

sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
Untuk mengatur agen menggunakan GitHub
  1. Unduh agen dari amazon-kinesis-agentawlabs/.

  2. Instal agen dengan menavigasi ke direktori unduhan dan menjalankan perintah berikut:

    sudo ./setup --install
Untuk mengatur agen dalam wadah Docker

Agen Kinesis dapat dijalankan dalam wadah juga melalui basis wadah amazonlinux. Gunakan Dockerfile berikut dan kemudian jalankan. docker build

FROM amazonlinux RUN yum install -y aws-kinesis-agent which findutils COPY agent.json /etc/aws-kinesis/agent.json CMD ["start-aws-kinesis-agent"]

Konfigurasikan dan mulai agen

Untuk mengonfigurasi dan memulai agen
  1. Buka dan edit file konfigurasi (sebagai pengguna super jika menggunakan izin akses file default): /etc/aws-kinesis/agent.json

    Dalam file konfigurasi ini, tentukan file ("filePattern") dari mana agen mengumpulkan data, dan nama stream ("kinesisStream") tempat agen mengirim data. Perhatikan bahwa nama file adalah pola, dan agen mengenali rotasi file. Anda dapat memutar file atau membuat file baru satu kali per detik. Agen menggunakan stempel waktu pembuatan file untuk menentukan file mana yang akan dilacak dan diekor ke aliran Anda; membuat file baru atau memutar file lebih sering dari sekali per detik tidak memungkinkan agen untuk membedakan dengan benar di antara mereka.

    { "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "yourkinesisstream" } ] }
  2. Mulailah agen secara manual:

    sudo service aws-kinesis-agent start
  3. (Opsional) Konfigurasikan agen untuk memulai pada startup sistem:

    sudo chkconfig aws-kinesis-agent on

Agen sekarang berjalan sebagai layanan sistem di latar belakang. Ini terus memantau file yang ditentukan dan mengirim data ke aliran yang ditentukan. Aktivitas agen masuk di /var/log/aws-kinesis-agent/aws-kinesis-agent.log.

Tentukan pengaturan konfigurasi agen

Agen mendukung dua pengaturan konfigurasi wajib, filePattern dankinesisStream, ditambah pengaturan konfigurasi opsional untuk fitur tambahan. Anda dapat menentukan konfigurasi wajib dan opsional di/etc/aws-kinesis/agent.json.

Setiap kali mengubah file konfigurasi, Anda harus menghentikan dan memulai agen, menggunakan perintah berikut:

sudo service aws-kinesis-agent stop sudo service aws-kinesis-agent start

Atau, Anda dapat menggunakan perintah berikut:

sudo service aws-kinesis-agent restart

Berikut ini adalah pengaturan konfigurasi umum.

Pengaturan Konfigurasi Deskripsi
assumeRoleARN

ARNPeran yang akan diasumsikan oleh pengguna. Untuk informasi selengkapnya, lihat Mendelegasikan Akses di Seluruh AWS Akun Menggunakan IAM Peran di Panduan IAM Pengguna.

assumeRoleExternalId

Pengidentifikasi opsional yang menentukan siapa yang dapat mengambil peran tersebut. Untuk informasi selengkapnya, lihat Cara Menggunakan ID Eksternal di Panduan IAM Pengguna.

awsAccessKeyId

AWS ID kunci akses yang mengesampingkan kredensyal default. Pengaturan ini diutamakan daripada semua penyedia kredensial lainnya.

awsSecretAccessKey

AWS kunci rahasia yang mengesampingkan kredensi default. Pengaturan ini diutamakan daripada semua penyedia kredensial lainnya.

cloudwatch.emitMetrics

Memungkinkan agen untuk memancarkan metrik ke CloudWatch if set (true).

Default: betul

cloudwatch.endpoint

Titik akhir regional untuk CloudWatch.

Default: monitoring.us-east-1.amazonaws.com

kinesis.endpoint

Titik akhir regional untuk Kinesis Data Streams.

Default: kinesis.us-east-1.amazonaws.com

Berikut ini adalah pengaturan konfigurasi aliran.

Pengaturan Konfigurasi Deskripsi
dataProcessingOptions

Daftar opsi pemrosesan diterapkan ke setiap catatan yang diuraikan sebelum dikirim ke aliran. Pilihan pemrosesan dilakukan dalam urutan yang ditentukan. Untuk informasi selengkapnya, lihat Gunakan agen untuk melakukan pra-proses data.

kinesisStream

[Wajib] Nama aliran.

filePattern

[Wajib] Direktori dan pola file yang harus dicocokkan untuk diambil oleh agen. Untuk semua file yang cocok dengan pola ini, izin baca harus diberikanaws-kinesis-agent-user. Untuk direktori yang berisi file, izin baca dan eksekusi harus diberikan kepadaaws-kinesis-agent-user.

initialPosition

Posisi awal dari mana file mulai diurai. Nilai yang valid adalah START_OF_FILE dan END_OF_FILE.

Default: END_OF_FILE

maxBufferAgeMillis

Waktu maksimum, dalam milidetik, di mana agen menyangga data sebelum mengirimnya ke aliran.

Rentang nilai: 1.000 hingga 900.000 (1 detik hingga 15 menit)

Default: 60.000 (1 menit)

maxBufferSizeBytes

Ukuran maksimum, dalam byte, di mana agen buffer data sebelum mengirimnya ke aliran.

Rentang nilai: 1 hingga 4.194.304 (4 MB)

Default: 4.194.304 (4 MB)

maxBufferSizeRecords

Jumlah maksimum catatan yang agen buffer data sebelum mengirimnya ke aliran.

Rentang nilai: 1 hingga 500

Default: 500

minTimeBetweenFilePollsMillis

Interval waktu, dalam milidetik, saat agen melakukan polling dan mengurai file yang dipantau untuk data baru.

Kisaran nilai: 1 atau lebih

Default: 100

multiLineStartPattern

Pola untuk mengidentifikasi awal catatan. Catatan dibuat dari baris yang cocok dengan pola tersebut dan baris berikutnya yang tidak cocok dengan pola tersebut. Nilai-nilai yang benar adalah ekspresi reguler. Secara default, setiap baris baru dalam file log diurai sebagai satu catatan.

partitionKeyOption

Metode untuk menghasilkan kunci partisi. Nilai yang valid adalah RANDOM (bilangan bulat yang dihasilkan secara acak) dan DETERMINISTIC (nilai hash dihitung dari data).

Default: RANDOM

skipHeaderLines

Jumlah baris yang dilewati agen untuk diurai di awal file yang dipantau.

Kisaran nilai: 0 atau lebih

Default: 0 (nol)

truncatedRecordTerminator

String yang digunakan agen untuk memotong rekaman yang diuraikan ketika ukuran rekaman melebihi batas ukuran rekaman Kinesis Data Streams. (1.000 KB)

Default: '\n' (baris baru)

Pantau beberapa direktori file dan tulis ke beberapa aliran

Dengan menentukan beberapa pengaturan konfigurasi aliran, Anda dapat mengonfigurasi agen untuk memantau beberapa direktori file dan mengirim data ke beberapa aliran. Dalam contoh konfigurasi berikut, agen memonitor dua direktori file dan mengirimkan data ke aliran Kinesis dan aliran pengiriman Firehose masing-masing. Perhatikan bahwa Anda dapat menentukan titik akhir yang berbeda untuk Kinesis Data Streams dan Firehose sehingga aliran Kinesis dan aliran pengiriman Firehose tidak perlu berada di wilayah yang sama.

{ "cloudwatch.emitMetrics": true, "kinesis.endpoint": "https://your/kinesis/endpoint", "firehose.endpoint": "https://your/firehose/endpoint", "flows": [ { "filePattern": "/tmp/app1.log*", "kinesisStream": "yourkinesisstream" }, { "filePattern": "/tmp/app2.log*", "deliveryStream": "yourfirehosedeliverystream" } ] }

Untuk informasi lebih lanjut tentang penggunaan agen dengan Firehose, lihat Menulis ke Amazon Data Firehose dengan Agen Kinesis.

Gunakan agen untuk melakukan pra-proses data

Agen dapat melakukan pra-proses rekaman yang diuraikan dari file yang dipantau sebelum mengirimnya ke streaming Anda. Anda dapat mengaktifkan fitur ini dengan menambahkan pengaturan konfigurasi dataProcessingOptions ke aliran file Anda. Satu atau lebih opsi pemrosesan dapat ditambahkan dan mereka akan dilakukan dalam urutan yang ditentukan.

Agen mendukung opsi pemrosesan berikut yang tercantum. Karena agen bersifat open-source, Anda dapat mengembangkan dan memperluas opsi pemrosesannya lebih lanjut. Anda dapat mengunduh agen dari Agen Kinesis.

Opsi Pemrosesan
SINGLELINE

Mengonversi rekaman multi-baris menjadi catatan baris tunggal dengan menghapus karakter baris baru, spasi utama, dan spasi tambahan.

{ "optionName": "SINGLELINE" }
CSVTOJSON

Mengkonversi rekaman dari pembatas format dipisahkan ke format. JSON

{ "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", ... ], "delimiter": "yourdelimiter" }
customFieldNames

[Wajib] Nama bidang yang digunakan sebagai kunci di setiap pasangan nilai JSON kunci. Misalnya, jika Anda menentukan["f1", "f2"], catatan “v1, v2" akan dikonversi ke. {"f1":"v1","f2":"v2"}

delimiter

String yang digunakan sebagai pembatas dalam catatan. Default adalah koma (,).

LOGTOJSON

Mengonversi catatan dari format log ke JSON format. Format log yang didukung adalah Apache Common Log, Apache Combined Log, Apache Error Log, dan RFC3164 Syslog.

{ "optionName": "LOGTOJSON", "logFormat": "logformat", "matchPattern": "yourregexpattern", "customFieldNames": [ "field1", "field2", ] }
logFormat

[Diperlukan] Format entri log. Berikut adalah nilai yang mungkin:

  • COMMONAPACHELOG — Format Log Umum Apache. Setiap entri log memiliki pola berikut secara default: "%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}".

  • COMBINEDAPACHELOG — Format Log Gabungan Apache. Setiap entri log memiliki pola berikut secara default: "%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}".

  • APACHEERRORLOG — Format Log Kesalahan Apache. Setiap entri log memiliki pola berikut secara default: "[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}".

  • SYSLOG— Format RFC3164 Syslog. Setiap entri log memiliki pola berikut secara default: "%{timestamp} %{hostname} %{program}[%{processid}]: %{message}".

matchPattern

Pola ekspresi reguler digunakan untuk mengekstrak nilai dari entri log. Pengaturan ini digunakan jika entri log Anda tidak dalam salah satu format log yang telah ditentukan sebelumnya. Jika pengaturan ini digunakan, Anda juga harus menentukancustomFieldNames.

customFieldNames

Nama bidang kustom digunakan sebagai kunci di setiap pasangan nilai JSON kunci. Anda dapat menggunakan pengaturan ini untuk menentukan nama bidang untuk nilai-nilai yang diekstraksi dari matchPattern, atau menimpa nama bidang default dari format log yang telah ditetapkan sebelumnya.

contoh : LOGTOJSON Konfigurasi

Berikut adalah salah satu contoh LOGTOJSON konfigurasi untuk entri Apache Common Log yang dikonversi ke JSON format:

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" }

Sebelum konversi:

64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291

Setelah konversi:

{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
contoh : LOGTOJSON Konfigurasi Dengan Bidang Kustom

Berikut adalah contoh lain konfigurasi LOGTOJSON:

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"] }

Dengan pengaturan konfigurasi ini, entri Apache Common Log yang sama dari contoh sebelumnya dikonversi ke JSON format sebagai berikut:

{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
contoh : Mengonversi Entri Log Umum Apache

Konfigurasi alur berikut mengonversi entri Apache Common Log ke catatan baris tunggal dalam JSON format:

{ "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "my-stream", "dataProcessingOptions": [ { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" } ] } ] }
contoh : Mengonversi Catatan Multi-Baris

Konfigurasi aliran berikut mengurai catatan multi-baris yang baris pertamanya dimulai dengan "[SEQUENCE=". Setiap catatan pertama kali dikonversi ke catatan baris tunggal. Kemudian, nilai-nilai diekstraksi dari catatan tersebut berdasarkan pembatas tab. Nilai yang diekstraksi dipetakan ke customFieldNames nilai tertentu untuk membentuk catatan satu baris dalam format. JSON

{ "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "my-stream", "multiLineStartPattern": "\\[SEQUENCE=", "dataProcessingOptions": [ { "optionName": "SINGLELINE" }, { "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", "field3" ], "delimiter": "\\t" } ] } ] }
contoh : LOGTOJSON Konfigurasi dengan Pola Pencocokan

Berikut adalah salah satu contoh LOGTOJSON konfigurasi untuk entri Log Umum Apache yang dikonversi ke JSON format, dengan bidang terakhir (byte) dihilangkan:

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})", "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"] }

Sebelum konversi:

123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200

Setelah konversi:

{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}

Gunakan CLI perintah agen

Secara otomatis memulai agen pada startup sistem:

sudo chkconfig aws-kinesis-agent on

Memeriksa status agen:

sudo service aws-kinesis-agent status

Menghentikan agen:

sudo service aws-kinesis-agent stop

Membaca file log agen dari lokasi ini:

/var/log/aws-kinesis-agent/aws-kinesis-agent.log

Menghapus instalasi agen:

sudo yum remove aws-kinesis-agent

FAQ

Apakah ada Agen Kinesis untuk Windows?

Kinesis Agent untuk Windows adalah perangkat lunak yang berbeda dari Kinesis Agent untuk platform Linux.

Mengapa Agen Kinesis melambat dan/atau meningkat? RecordSendErrors

Ini biasanya karena pelambatan dari Kinesis. Periksa WriteProvisionedThroughputExceeded metrik untuk Kinesis Data Streams ThrottledRecords atau metrik untuk Firehose Delivery Streams. Setiap peningkatan dari 0 dalam metrik ini menunjukkan bahwa batas aliran perlu ditingkatkan. Untuk informasi selengkapnya, lihat batas Kinesis Data Stream dan Amazon Firehose Delivery Streams.

Setelah Anda mengesampingkan pembatasan, lihat apakah Agen Kinesis dikonfigurasi untuk mengekor sejumlah besar file kecil. Ada penundaan ketika Agen Kinesis mengekor file baru, jadi Agen Kinesis harus membuntuti sejumlah kecil file yang lebih besar. Coba konsolidasikan file log Anda ke file yang lebih besar.

Mengapa saya mendapatkan java.lang.OutOfMemoryError pengecualian?

Agen Kinesis tidak memiliki cukup memori untuk menangani beban kerjanya saat ini. Cobalah meningkatkan JAVA_START_HEAP dan JAVA_MAX_HEAP masuk /usr/bin/start-aws-kinesis-agent dan memulai kembali agen.

Mengapa saya mendapatkan IllegalStateException : connection pool shut down pengecualian?

Agen Kinesis tidak memiliki koneksi yang cukup untuk menangani beban kerjanya saat ini. Coba tingkatkan maxConnections dan maxSendingThreads dalam pengaturan konfigurasi agen umum Anda di/etc/aws-kinesis/agent.json. Nilai default untuk bidang ini adalah 12 kali prosesor runtime yang tersedia. Lihat AgentConfiguration.java untuk mengetahui selengkapnya tentang pengaturan konfigurasi agen lanjutan.

Bagaimana saya bisa men-debug masalah lain dengan Agen Kinesis?

DEBUGlog level dapat diaktifkan di/etc/aws-kinesis/log4j.xml.

Bagaimana cara mengonfigurasi Agen Kinesis?

Semakin kecilmaxBufferSizeBytes, semakin sering Agen Kinesis akan mengirim data. Ini bisa bagus karena mengurangi waktu pengiriman catatan, tetapi juga meningkatkan permintaan per detik ke Kinesis.

Mengapa Agen Kinesis mengirimkan catatan duplikat?

Ini terjadi karena kesalahan konfigurasi dalam file tailing. Pastikan masing-masing hanya fileFlow’s filePattern cocok dengan satu file. Ini juga dapat terjadi jika logrotate mode yang digunakan dalam copytruncate mode. Coba ubah mode ke mode default atau buat untuk menghindari duplikasi. Untuk informasi selengkapnya tentang penanganan rekaman duplikat, lihat Menangani Rekaman Duplikat.