Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Apache Kafka
L'azione Apache Kafka (Kafka) invia messaggi direttamente ai tuoi Amazon Managed Streaming for Apache Kafka (MSKAmazon), ai cluster Apache Kafka gestiti da provider di terze parti come Confluent Cloud o ai cluster Apache Kafka autogestiti.
Nota
Questo argomento presuppone la familiarità con la piattaforma Apache Kafka e i relativi concetti. Per ulteriori informazioni su Apache Kafka, consulta Apache Kafka
Requisiti
Questa operazione della regola presenta i seguenti requisiti:
-
Un IAM ruolo che AWS IoT può assumere per eseguire le
ec2:CreateNetworkInterface
,,,,ec2:DescribeNetworkInterfaces
ec2:CreateNetworkInterfacePermission
ec2:DeleteNetworkInterface
ec2:DescribeSubnets
,ec2:DescribeVpcs
e le operazioni.ec2:DescribeVpcAttribute
ec2:DescribeSecurityGroups
Questo ruolo crea e gestisce interfacce di rete elastiche per il tuo Amazon Virtual Private Cloud per raggiungere il tuo broker Kafka. Per ulteriori informazioni, consulta Concedere a qualsiasi AWS IoT regola l'accesso richiesto.Nella AWS IoT console, è possibile scegliere o creare un ruolo per consentire l'esecuzione di questa azione relativa AWS IoT Core alla regola.
Per ulteriori informazioni sulle interfacce di rete, consulta Interfacce di rete elastiche nella Amazon EC2 User Guide.
La policy associata al ruolo che specifichi sarà simile a quella del seguente esempio.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
Se utilizzi AWS Secrets Manager per memorizzare le credenziali necessarie per connetterti al tuo broker Kafka, devi creare un IAM ruolo che AWS IoT Core possa assumere per eseguire le operazioni and.
secretsmanager:GetSecretValue
secretsmanager:DescribeSecret
La policy associata al ruolo che specifichi sarà simile a quella del seguente esempio.
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:
region
:123456789012
:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region
:123456789012
:secret:kafka_keytab-*" ] } ] }-
Puoi eseguire i tuoi cluster Apache Kafka all'interno di Amazon Virtual Private Cloud (Amazon). VPC È necessario creare una VPC destinazione Amazon e utilizzare un NAT gateway nelle sottoreti per inoltrare messaggi da un cluster AWS IoT Kafka pubblico. Il motore AWS IoT delle regole crea un'interfaccia di rete in ciascuna delle sottoreti elencate nella VPC destinazione per indirizzare il traffico direttamente verso. VPC Quando si crea una VPC destinazione, il motore delle AWS IoT regole crea automaticamente un'azione di VPC regola. Per ulteriori informazioni sulle azioni delle VPC regole, vedereVPCDestinazioni nel cloud privato virtuale ().
-
Se si utilizza una KMS chiave gestita AWS KMS key dal cliente per crittografare i dati inattivi, il servizio deve disporre dell'autorizzazione a utilizzare la KMS chiave per conto del chiamante. Per ulteriori informazioni, consulta Amazon MSK encryption nella Amazon Managed Streaming for Apache Kafka Developer Guide.
Parametri
Quando crei una AWS IoT regola con questa azione, devi specificare le seguenti informazioni:
- destinationArn
L'Amazon Resource Name (ARN) della VPC destinazione. Per informazioni sulla creazione di una VPC destinazione, consultaVPCDestinazioni nel cloud privato virtuale ().
- topic
L'argomento Kafka per i messaggi da inviare al broker Kafka.
È possibile sostituire questo campo utilizzando un modello di sostituzione. Per ulteriori informazioni, consulta Modelli di sostituzione.
- chiave (opzionale)
La chiave del messaggio Kafka.
È possibile sostituire questo campo utilizzando un modello di sostituzione. Per ulteriori informazioni, consulta Modelli di sostituzione.
- intestazioni (opzionali)
-
L'elenco delle intestazioni specificate. Ogni intestazione è una coppia chiave-valore che puoi specificare quando crei un'operazione Kafka. Puoi utilizzare queste intestazioni per instradare i dati dai client IoT ai cluster Kafka a valle senza modificare il payload dei messaggi.
È possibile sostituire questo campo utilizzando un modello di sostituzione. Per capire come passare la funzione di una regola in linea come modello sostitutivo nell'intestazione dell'operazione Kafka, consulta Esempi. Per ulteriori informazioni, consulta Modelli di sostituzione.
Nota
Le intestazioni in formato binario non sono supportate.
- partizione (opzionale)
La partizione del messaggio Kafka.
È possibile sostituire questo campo utilizzando un modello di sostituzione. Per ulteriori informazioni, consulta Modelli di sostituzione.
- clientProperties
Un oggetto che definisce le proprietà del client del produttore Apache Kafka.
- conferme (facoltativo)
Il numero di conferme che, secondo il produttore, il server deve aver ricevuto, prima di considerare una richiesta come completa.
Se specifichi 0 come valore, il produttore non attenderà alcuna conferma da parte del server. Se il server non riceve il messaggio, il produttore non tenterà di inviare il messaggio un'altra volta.
Valori validi:
-1
,0
,1
,all
. Il valore predefinito è1
.- bootstrap.servers
Un elenco di coppie host e porta (ad esempio
host1:port1
,host2:port2
) utilizzato per stabilire la connessione iniziale al cluster Kafka.- compression.type (opzionale)
Il tipo di compressione per tutti i dati generati dal produttore.
Valori validi:
none
,gzip
,snappy
,lz4
,zstd
. Il valore predefinito ènone
.- security.protocol
Il protocollo di sicurezza usato per collegarsi al broker Kafka.
Valori validi:
SSL
,SASL_SSL
. Il valore predefinito èSSL
.- key.serializer
Specifica come trasformare gli oggetti chiave che fornisci con il
ProducerRecord
in byte.Valore valido:
StringSerializer
.- value.serializer
Specifica come trasformare gli oggetti valore che fornisci con il
ProducerRecord
in byte.Valore valido:
ByteBufferSerializer
.- ssl.truststore
Il file truststore in formato base64 o la posizione del file truststore in AWS Secrets Manager. Questo valore non è richiesto se il tuo truststore è considerato attendibile dalle autorità di certificazione Amazon (CA).
Questo campo supporta i modelli di sostituzione. Se utilizzi Secrets Manager per memorizzare le credenziali necessarie per connetterti al tuo broker Kafka, puoi utilizzare la
get_secret
SQL funzione per recuperare il valore di questo campo. Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sulla funzione, vedere.get_secret
SQL get_secret (secretId, secretType, chiave, roleArn) Se il truststore ha la forma di un file, usa il parametroSecretBinary
. Se il truststore è sotto forma di una stringa, usa il parametroSecretString
.La dimensione massima di questo valore è di 65 KB.
- ssl.truststore.password
La password del truststore. Questo valore è richiesto solo se è stata creata una password per il truststore.
- ssl.keystore
Il file keystore. Questo valore è obbligatorio quando si specifica
SSL
come valore persecurity.protocol
.Questo campo supporta i modelli di sostituzione. Utilizza Secrets Manager per archiviare le credenziali necessarie per connetterti al broker Kafka. Per recuperare il valore di questo campo, utilizzare la
get_secret
SQL funzione. Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sullaget_secret
SQL funzione, vedereget_secret (secretId, secretType, chiave, roleArn). Utilizzo del parametroSecretBinary
.- ssl.keystore.password
La password dell'archivio per il file keystore. Questo valore è obbligatorio se viene specificato un valore per
ssl.keystore
.Il valore di questo campo può essere un testo semplice. Questo campo supporta anche i modelli di sostituzione. Utilizza Secrets Manager per archiviare le credenziali necessarie per connetterti al broker Kafka. Per recuperare il valore di questo campo, utilizzare la
get_secret
SQL funzione. Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sullaget_secret
SQL funzione, vedereget_secret (secretId, secretType, chiave, roleArn). Utilizzo del parametroSecretString
.- ssl.key.password
La password della chiave privata nel file keystore.
Questo campo supporta i modelli di sostituzione. Utilizza Secrets Manager per archiviare le credenziali necessarie per connetterti al broker Kafka. Per recuperare il valore di questo campo, utilizzare la
get_secret
SQL funzione. Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sullaget_secret
SQL funzione, vedereget_secret (secretId, secretType, chiave, roleArn). Utilizzo del parametroSecretString
.- sasl.mechanism
Il meccanismo di sicurezza utilizzato per connettersi al broker Kafka. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
.Valori validi:
PLAIN
,SCRAM-SHA-512
,GSSAPI
.Nota
SCRAM-SHA-512
è l'unico meccanismo di sicurezza supportato nelle regioni cn-north-1, cn-northwest-1, -1 e -1. us-gov-east us-gov-west- sasl.plain.username
Il nome utente utilizzato per recuperare la stringa segreta da Secrets Manager. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
ePLAIN
persasl.mechanism
.- sasl.plain.password
La password utilizzata per recuperare la stringa segreta da Secrets Manager. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
ePLAIN
persasl.mechanism
.- sasl.scram.username
Il nome utente utilizzato per recuperare la stringa segreta da Secrets Manager. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
eSCRAM-SHA-512
persasl.mechanism
.- sasl.scram.password
La password utilizzata per recuperare la stringa segreta da Secrets Manager. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
eSCRAM-SHA-512
persasl.mechanism
.- sasl.kerberos.keytab
Il file keytab per l'autenticazione di Kerberos in Secrets Manager. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
eGSSAPI
persasl.mechanism
.Questo campo supporta i modelli di sostituzione. Utilizza Secrets Manager per archiviare le credenziali necessarie per connetterti al broker Kafka. Per recuperare il valore di questo campo, usa la funzione.
get_secret
SQL Per ulteriori informazioni sui modelli di sostituzione, consulta Modelli di sostituzione. Per ulteriori informazioni sullaget_secret
SQL funzione, vedereget_secret (secretId, secretType, chiave, roleArn). Utilizzo del parametroSecretBinary
.- sasl.kerberos.service.name
Il nome principale Kerberos con il quale Apache Kafka funziona. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
eGSSAPI
persasl.mechanism
.- sasl.kerberos.krb5.kdc
Il nome host del centro di distribuzione delle chiavi (KDC) a cui si connette il client di produzione di Apache Kafka. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
eGSSAPI
persasl.mechanism
.- sasl.kerberos.krb5.realm
Il dominio a cui si connette il client del produttore Apache Kafka. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
eGSSAPI
persasl.mechanism
.- sasl.kerberos.principal
L'identità Kerberos univoca a cui Kerberos può assegnare ticket per accedere ai servizi compatibili con Kerberos. Questo valore è obbligatorio quando si specifica
SASL_SSL
persecurity.protocol
eGSSAPI
persasl.mechanism
.
Esempi
L'JSONesempio seguente definisce un'azione di Apache Kafka in una regola. AWS IoT L'esempio seguente passa la funzione inline sourceIp() come modello sostitutivo nell'intestazione Kafka Action.
{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }
Note importanti sulla configurazione di Kerberos
Il centro di distribuzione delle chiavi (KDC) deve essere risolvibile tramite Domain Name System () privato all'interno del target. DNS VPC Un approccio possibile consiste nell'aggiungere la KDC DNS voce a una zona ospitata privata. Per ulteriori informazioni su questo approccio, consulta Utilizzo delle zone ospitate private.
Ciascuno VPC deve avere DNS la risoluzione abilitata. Per ulteriori informazioni, vedi Utilizzo DNS con il tuo VPC.
I gruppi di sicurezza dell'interfaccia di rete e i gruppi di sicurezza a livello di istanza nella VPC destinazione devono consentire il traffico dall'interno dell'utente VPC sulle seguenti porte.
TCPtraffico sulla porta del listener del broker bootstrap (spesso 9092, ma deve essere compreso nell'intervallo 9000-9100)
TCPe traffico sulla porta 88 per UDP KDC
SCRAM-SHA-512
è l'unico meccanismo di sicurezza supportato nelle regioni cn-north-1, cn-northwest-1, -1 e -1. us-gov-east us-gov-west