Apache Kafka - AWS IoT Core

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Apache Kafka

Tindakan Apache Kafka (Kafka) mengirim pesan langsung ke Amazon Managed Streaming for Apache Kafka (MSKAmazon), cluster Apache Kafka yang dikelola oleh penyedia pihak ketiga seperti Confluent Cloud, atau cluster Apache Kafka yang dikelola sendiri. Dengan tindakan aturan Kafka, Anda dapat merutekan data IoT Anda ke cluster Kafka. Ini memungkinkan Anda untuk membangun jaringan data berkinerja tinggi untuk berbagai tujuan, seperti analisis streaming, integrasi data, visualisasi, dan aplikasi bisnis yang sangat penting.

catatan

Topik ini mengasumsikan keakraban dengan platform Apache Kafka dan konsep terkait. Untuk informasi lebih lanjut tentang Apache Kafka, lihat Apache Kafka. MSKTanpa server tidak didukung. MSKCluster tanpa server hanya dapat dilakukan melalui IAM otentikasi, yang saat ini tidak didukung oleh tindakan aturan Apache Kafka. Untuk informasi selengkapnya tentang cara mengonfigurasi AWS IoT Core dengan Confluent, lihat Memanfaatkan Konfluen dan AWS Memecahkan Tantangan Manajemen Perangkat dan Data IoT.

Persyaratan

Tindakan aturan ini memiliki persyaratan sebagai berikut:

  • IAMPeran yang AWS IoT dapat diasumsikan untuk melakukanec2:CreateNetworkInterface,ec2:DescribeNetworkInterfaces,ec2:CreateNetworkInterfacePermission,ec2:DeleteNetworkInterface,ec2:DescribeSubnets,ec2:DescribeVpcs,ec2:DescribeVpcAttribute, dan ec2:DescribeSecurityGroups operasi. Peran ini menciptakan dan mengelola antarmuka jaringan elastis ke Amazon Virtual Private Cloud Anda untuk menjangkau broker Kafka Anda. Untuk informasi selengkapnya, lihat Memberikan AWS IoT aturan akses yang dibutuhkannya.

    Di AWS IoT konsol, Anda dapat memilih atau membuat peran untuk memungkinkan AWS IoT Core untuk melakukan tindakan aturan ini.

    Untuk informasi selengkapnya tentang antarmuka jaringan, lihat Antarmuka jaringan elastis di EC2Panduan Pengguna Amazon.

    Kebijakan yang dilampirkan pada peran yang Anda tentukan akan terlihat seperti contoh berikut.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • Jika Anda menggunakan AWS Secrets Manager untuk menyimpan kredensyal yang diperlukan untuk terhubung ke broker Kafka Anda, Anda harus membuat IAM peran yang AWS IoT Core dapat diasumsikan untuk melakukan dan operasi. secretsmanager:GetSecretValue secretsmanager:DescribeSecret

    Kebijakan yang dilampirkan pada peran yang Anda tentukan akan terlihat seperti contoh berikut.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:region:123456789012:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region:123456789012:secret:kafka_keytab-*" ] } ] }
  • Anda dapat menjalankan cluster Apache Kafka Anda di dalam Amazon Virtual Private Cloud (Amazon). VPC Anda harus membuat VPC tujuan Amazon dan menggunakan NAT gateway di subnet Anda untuk meneruskan pesan dari AWS IoT ke kluster Kafka publik. Mesin AWS IoT aturan menciptakan antarmuka jaringan di setiap subnet yang tercantum di VPC tujuan untuk mengarahkan lalu lintas langsung keVPC. Saat Anda membuat VPC tujuan, mesin AWS IoT aturan secara otomatis membuat tindakan VPC aturan. Untuk informasi selengkapnya tentang tindakan VPC aturan, lihatTujuan cloud pribadi virtual (VPC).

  • Jika Anda menggunakan AWS KMS key (KMSkunci) yang dikelola pelanggan untuk mengenkripsi data saat istirahat, layanan harus memiliki izin untuk menggunakan KMS kunci atas nama pemanggil. Untuk informasi selengkapnya, lihat MSKenkripsi Amazon di Panduan Pengembang Amazon Managed Streaming for Apache Kafka Kafka.

Parameter

Saat Anda membuat AWS IoT aturan dengan tindakan ini, Anda harus menentukan informasi berikut:

destinationArn

Nama Sumber Daya Amazon (ARN) dari VPC tujuan. Untuk informasi tentang membuat VPC tujuan, lihatTujuan cloud pribadi virtual (VPC).

topik

Topik Kafka untuk pesan yang akan dikirim ke broker Kafka.

Anda dapat mengganti bidang ini menggunakan templat substitusi. Untuk informasi selengkapnya, lihat Templat substitusi.

kunci (opsional)

Kunci pesan Kafka.

Anda dapat mengganti bidang ini menggunakan templat substitusi. Untuk informasi selengkapnya, lihat Templat substitusi.

header (opsional)

Daftar header Kafka yang Anda tentukan. Setiap header adalah pasangan kunci-nilai yang dapat Anda tentukan saat Anda membuat tindakan Kafka. Anda dapat menggunakan header ini untuk merutekan data dari klien IoT ke hilir kluster Kafka tanpa mengubah payload pesan Anda.

Anda dapat mengganti bidang ini menggunakan templat substitusi. Untuk memahami cara meneruskan fungsi Aturan sebaris sebagai templat substitusi di header Kafka Action, lihat Contoh. Untuk informasi selengkapnya, lihat Templat substitusi.

catatan

Header dalam format biner tidak didukung.

partisi (opsional)

Partisi pesan Kafka.

Anda dapat mengganti bidang ini menggunakan templat substitusi. Untuk informasi selengkapnya, lihat Templat substitusi.

clientProperties

Objek yang mendefinisikan properti klien produsen Apache Kafka.

acks (opsional)

Jumlah ucapan terima kasih yang harus diterima oleh produsen sebelum mempertimbangkan permintaan lengkap.

Jika Anda menentukan 0 sebagai nilai, produsen tidak akan menunggu pengakuan apa pun dari server. Jika server tidak menerima pesan, produser tidak akan mencoba lagi untuk mengirim pesan.

Nilai yang valid:-1,0,1,all. Nilai default-nya adalah 1.

bootstrap.servers

Daftar pasangan host dan port (misalnya,host1:port1,host2:port2) yang digunakan untuk membuat koneksi awal ke cluster Kafka Anda.

kompresi.type (opsional)

Jenis kompresi untuk semua data yang dihasilkan oleh produsen.

Nilai yang valid:none,gzip,snappy,lz4,zstd. Nilai default-nya adalah none.

security.protocol

Protokol keamanan yang digunakan untuk melampirkan ke broker Kafka Anda.

Nilai-nilai yang valid: SSL, SASL_SSL. Nilai default-nya adalah SSL.

key.serializer

Menentukan cara mengubah objek kunci yang Anda berikan dengan ProducerRecord menjadi byte.

Nilai valid: StringSerializer.

value.serializer

Menentukan cara mengubah objek nilai yang Anda berikan dengan ProducerRecord menjadi byte.

Nilai valid: ByteBufferSerializer.

ssl.truststore

File truststore dalam format base64 atau lokasi file truststore di. AWS Secrets Manager Nilai ini tidak diperlukan jika truststore Anda dipercaya oleh otoritas sertifikat Amazon (CA).

Bidang ini mendukung template substitusi. Jika Anda menggunakan Secrets Manager untuk menyimpan kredensyal yang diperlukan untuk terhubung ke broker Kafka Anda, Anda dapat menggunakan get_secret SQL fungsi tersebut untuk mengambil nilai untuk bidang ini. Untuk informasi selengkapnya tentang templat substitusi, lihatTemplat substitusi. Untuk informasi selengkapnya tentang get_secret SQL fungsi, lihatget_secret (secretID, secretType, kunci, roLearn). Jika truststore dalam bentuk file, gunakan parameternyaSecretBinary. Jika truststore dalam bentuk string, gunakan parameternyaSecretString.

Ukuran maksimum nilai ini adalah 65 KB.

ssl.truststore.password

Kata sandi untuk truststore. Nilai ini diperlukan hanya jika Anda telah membuat kata sandi untuk truststore.

ssl.keystore

File keystore. Nilai ini diperlukan saat Anda menentukan SSL sebagai nilai untuksecurity.protocol.

Bidang ini mendukung template substitusi. Gunakan Secrets Manager untuk menyimpan kredensi yang diperlukan untuk terhubung ke broker Kafka Anda. Untuk mengambil nilai untuk bidang ini, gunakan get_secret SQL fungsi. Untuk informasi selengkapnya tentang templat substitusi, lihatTemplat substitusi. Untuk informasi selengkapnya tentang get_secret SQL fungsi, lihatget_secret (secretID, secretType, kunci, roLearn). Gunakan parameter SecretBinary.

ssl.keystore.password

Kata sandi toko untuk file keystore. Nilai ini diperlukan jika Anda menentukan nilai untukssl.keystore.

Nilai bidang ini bisa berupa plaintext. Bidang ini juga mendukung template substitusi. Gunakan Secrets Manager untuk menyimpan kredensi yang diperlukan untuk terhubung ke broker Kafka Anda. Untuk mengambil nilai untuk bidang ini, gunakan get_secret SQL fungsi. Untuk informasi selengkapnya tentang templat substitusi, lihatTemplat substitusi. Untuk informasi selengkapnya tentang get_secret SQL fungsi, lihatget_secret (secretID, secretType, kunci, roLearn). Gunakan parameter SecretString.

ssl.key.password

Kata sandi kunci pribadi di file keystore Anda.

Bidang ini mendukung template substitusi. Gunakan Secrets Manager untuk menyimpan kredensi yang diperlukan untuk terhubung ke broker Kafka Anda. Untuk mengambil nilai untuk bidang ini, gunakan get_secret SQL fungsi. Untuk informasi selengkapnya tentang templat substitusi, lihatTemplat substitusi. Untuk informasi selengkapnya tentang get_secret SQL fungsi, lihatget_secret (secretID, secretType, kunci, roLearn). Gunakan parameter SecretString.

sasl.mekanisme

Mekanisme keamanan yang digunakan untuk terhubung ke broker Kafka Anda. Nilai ini diperlukan saat Anda menentukan SASL_SSL untuksecurity.protocol.

Nilai-nilai yang valid: PLAIN, SCRAM-SHA-512, GSSAPI.

catatan

SCRAM-SHA-512adalah satu-satunya mekanisme keamanan yang didukung di Wilayah cn-north-1, cn-northwest-1, -1, dan -1. us-gov-east us-gov-west

sasl.plain.username

Nama pengguna yang digunakan untuk mengambil string rahasia dari Secrets Manager. Nilai ini diperlukan saat Anda menentukan SASL_SSL untuk security.protocol dan PLAIN untuksasl.mechanism.

sasl.plain.password

Kata sandi yang digunakan untuk mengambil string rahasia dari Secrets Manager. Nilai ini diperlukan saat Anda menentukan SASL_SSL untuk security.protocol dan PLAIN untuksasl.mechanism.

sasl.scram.username

Nama pengguna yang digunakan untuk mengambil string rahasia dari Secrets Manager. Nilai ini diperlukan saat Anda menentukan SASL_SSL untuk security.protocol dan SCRAM-SHA-512 untuksasl.mechanism.

sasl.scram.password

Kata sandi yang digunakan untuk mengambil string rahasia dari Secrets Manager. Nilai ini diperlukan saat Anda menentukan SASL_SSL untuk security.protocol dan SCRAM-SHA-512 untuksasl.mechanism.

sasl.kerberos.keytab

File keytab untuk otentikasi Kerberos di Secrets Manager. Nilai ini diperlukan saat Anda menentukan SASL_SSL untuk security.protocol dan GSSAPI untuksasl.mechanism.

Bidang ini mendukung template substitusi. Gunakan Secrets Manager untuk menyimpan kredensi yang diperlukan untuk terhubung ke broker Kafka Anda. Untuk mengambil nilai untuk bidang ini, gunakan get_secret SQL fungsi. Untuk informasi selengkapnya tentang templat substitusi, lihatTemplat substitusi. Untuk informasi selengkapnya tentang get_secret SQL fungsi, lihatget_secret (secretID, secretType, kunci, roLearn). Gunakan parameter SecretBinary.

sasl.kerberos.service.name

Nama utama Kerberos di mana Apache Kafka berjalan. Nilai ini diperlukan saat Anda menentukan SASL_SSL untuk security.protocol dan GSSAPI untuksasl.mechanism.

sasl.kerberos.krb5.kdc

Nama host dari pusat distribusi utama (KDC) yang terhubung dengan klien produsen Apache Kafka Anda. Nilai ini diperlukan saat Anda menentukan SASL_SSL untuk security.protocol dan GSSAPI untuksasl.mechanism.

sasl.kerberos.krb5.realm

Ranah yang terhubung dengan klien produser Apache Kafka Anda. Nilai ini diperlukan saat Anda menentukan SASL_SSL untuk security.protocol dan GSSAPI untuksasl.mechanism.

sasl.kerberos.principal

Identitas unik Kerberos tempat Kerberos dapat menetapkan tiket untuk mengakses layanan yang sadar Kerberos. Nilai ini diperlukan saat Anda menentukan SASL_SSL untuk security.protocol dan GSSAPI untuksasl.mechanism.

Contoh

JSONContoh berikut mendefinisikan tindakan Apache Kafka dalam suatu aturan. AWS IoT Contoh berikut melewati sourceIp() fungsi inline sebagai template substitusi di header Kafka Action.

{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }

Catatan penting tentang pengaturan Kerberos Anda

  • Pusat distribusi kunci (KDC) Anda harus dapat diselesaikan melalui Sistem Nama Domain pribadi (DNS) dalam target Anda. VPC Salah satu pendekatan yang mungkin adalah menambahkan KDC DNS entri ke zona host pribadi. Untuk informasi selengkapnya tentang pendekatan ini, lihat Bekerja dengan zona yang dihosting pribadi.

  • Masing-masing VPC harus memiliki DNS resolusi yang diaktifkan. Untuk informasi selengkapnya, lihat Menggunakan DNS dengan Anda VPC.

  • Grup keamanan antarmuka jaringan dan grup keamanan tingkat instans di VPC tujuan harus mengizinkan lalu lintas dari dalam port VPC berikut.

    • TCPlalu lintas pada port pendengar broker bootstrap (seringkali 9092, tetapi harus berada dalam kisaran 9000—9100)

    • TCPdan UDP lalu lintas di port 88 untuk KDC

  • SCRAM-SHA-512adalah satu-satunya mekanisme keamanan yang didukung di Wilayah cn-north-1, cn-northwest-1, -1, dan -1. us-gov-east us-gov-west