Apache Kafka - AWS IoT Core

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Apache Kafka

L'action Apache Kafka (Kafka) envoie des messages directement à votre Amazon Managed Streaming for Apache Kafka (Amazon MSK) ou à vos clusters Apache Kafka autogérés pour l'analyse et la visualisation des données.

Note

Cette rubrique suppose une bonne connaissance de la plateforme Apache Kafka et des concepts associés. Pour de plus amples informations sur Apache Kafka, veuillez consulter Apache Kafka. MSK Serverless n'est pas pris en charge. Les clusters MSK Serverless ne peuvent être créés que via l'authentification IAM, que l'action des règles d'Apache Kafka ne prend pas en charge actuellement.

Prérequis

Cette action réglementaire est assortie des exigences suivantes :

  • Rôle IAM qui AWS IoT peut assumer d'exécuter les ec2:DescribeSecurityGroups opérations ec2:CreateNetworkInterfaceec2:DescribeNetworkInterfaces,ec2:CreateNetworkInterfacePermission,ec2:DeleteNetworkInterface,ec2:DescribeSubnets,ec2:DescribeVpcs,ec2:DescribeVpcAttribute, et. Ce rôle crée et gère des interfaces réseau élastiques vers votre Amazon Virtual Private Cloud afin d'atteindre votre courtier Kafka. Pour plus d’informations, consultez Accorder à une AWS IoT règle l'accès dont elle a besoin.

    Dans la AWS IoT console, vous pouvez choisir ou créer un rôle pour autoriser l'exécution AWS IoT Core de cette action de règle.

    Pour plus d'informations sur les interfaces réseau, consultez Elastic network interfaces dans le Guide de l'utilisateur Amazon EC2.

    La stratégie associée au rôle que vous spécifiez doit ressembler à l'exemple suivant.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • Si vous stockez AWS Secrets Manager les informations d'identification requises pour vous connecter à votre courtier Kafka, vous devez créer un rôle IAM AWS IoT Core capable d'effectuer les opérations secretsmanager:GetSecretValue etsecretsmanager:DescribeSecret.

    La stratégie associée au rôle que vous spécifiez doit ressembler à l'exemple suivant.

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:region:123456789012:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region:123456789012:secret:kafka_keytab-*" ] } ] }
  • Vous pouvez exécuter vos clusters Apache Kafka dans Amazon Virtual Private Cloud (Amazon VPC). Vous devez créer une destination Amazon VPC et utiliser une passerelle NAT dans vos sous-réseaux pour transférer les messages depuis un cluster AWS IoT Kafka public. Le moteur de AWS IoT règles crée une interface réseau dans chacun des sous-réseaux répertoriés dans la destination du VPC pour acheminer le trafic directement vers le VPC. Lorsque vous créez une destination VPC, le moteur de AWS IoT règles crée automatiquement une action de règle VPC. Pour plus d'informations sur les actions de règle VPC, consultez Destinations de cloud privé virtuel (VPC).

  • Si vous utilisez une clé KMS gérée par AWS KMS key le client pour chiffrer les données au repos, le service doit être autorisé à utiliser la clé KMS au nom de l'appelant. Pour plus d'informations, consultez Actions Amazon MSK Kafka dans le Guide du développeur Amazon Managed Streaming for Apache Kafka.

Paramètres

Lorsque vous créez une AWS IoT règle avec cette action, vous devez spécifier les informations suivantes :

DestinationArn

ARN (Amazon Resource Name) de la destination VPC. Pour plus d'informations sur la création d'une destination VPC, consultez Destinations de cloud privé virtuel (VPC).

topic

La rubrique Kafka pour les messages à envoyer à l'agent Kafka.

Vous pouvez remplacer ce champ à l'aide d'un modèle de substitution. Pour plus d’informations, consultez Modèles de substitution.

clé (facultatif)

La clé de message Kafka.

Vous pouvez remplacer ce champ à l'aide d'un modèle de substitution. Pour plus d’informations, consultez Modèles de substitution.

en-têtes (facultatif)

La liste des en-têtes Kafka que vous spécifiez. Chaque en-tête est une paire clé-valeur que vous pouvez spécifier lors de la création d'une action Kafka. Vous pouvez utiliser ces en-têtes pour acheminer les données des clients IoT vers les clusters Kafka en aval sans modifier la charge utile de vos messages.

Vous pouvez remplacer ce champ à l'aide d'un modèle de substitution. Pour comprendre comment transmettre la fonction d'une règle intégrée en tant que modèle de substitution dans l'en-tête de Kafka Action, consultez les exemples. Pour plus d’informations, consultez Modèles de substitution.

Note

Les en-têtes au format binaire ne sont pas pris en charge.

partition (facultatif)

La partition de message Kafka.

Vous pouvez remplacer ce champ à l'aide d'un modèle de substitution. Pour plus d’informations, consultez Modèles de substitution.

ClientProperties

Un objet qui définit les propriétés du client producteur Apache Kafka.

Packs (facultatif)

Le nombre d'accusés de réception que le producteur demande au serveur de recevoir avant de considérer qu'une demande est complète.

Si vous spécifiez 0 comme valeur, le producteur n'attendra aucun accusé de réception du serveur. Si le serveur ne reçoit pas le message, le producteur ne réessaiera pas de l'envoyer.

Valeurs valides: -1, 0, 1, all. La valeur par défaut est 1.

serveurs bootstrap

Liste des paires d'hôtes et de ports (par exemplehost1:port1,host2:port2) utilisées pour établir la connexion initiale à votre cluster Kafka.

compression.type (facultatif)

Type de compression pour toutes les données générées par le producteur.

Valeurs valides: none, gzip, snappy, lz4, zstd. La valeur par défaut est none.

security.protocol

Le protocole de sécurité utilisé pour vous connecter à votre courtier Kafka.

Valeurs valides : SSL, SASL_SSL. La valeur par défaut est SSL.

key.serializer

Spécifie comment transformer en octets les objets clés que vous fournissez avec leProducerRecord.

Valeur valide : StringSerializer.

value.serializer

Spécifie comment transformer en octets les objets de valeur que vous fournissez avec leProducerRecord.

Valeur valide : ByteBufferSerializer.

ssl.truststore

Le fichier truststore au format base64 ou l'emplacement du fichier truststore dans. AWS Secrets Manager Cette valeur n'est pas obligatoire si votre truststore est approuvé par les autorités de certification Amazon (CA).

Ce champ prend en charge les modèles de substitution. Si vous utilisez Secrets Manager pour stocker les informations d'identification requises pour vous connecter à votre broker Kafka, vous pouvez utiliser la fonction get_secret SQL pour récupérer la valeur de ce champ. Pour de plus amples informations sur les modèles de substitution, veuillez consulter Modèles de substitution. Pour plus d'informations sur la get_secret fonction SQL, veillez consultez get_secret (SecreTid, SecretType, clé, roLearn). Si le truststore se présente sous la forme d'un fichier, utilisez le SecretBinary paramètre. Si le truststore se présente sous la forme d'une chaîne, utilisez le SecretString paramètre.

La taille maximale de cette valeur est de 65 Ko.

ssl.truststore.password

Le mot de passe du référentiel d'approbations. Cette valeur n'est requise que si vous avez créé un mot de passe pour le référentiel d'approbations.

ssl.keystore

Le fichier keystore. Cette valeur est obligatoire lorsque vous spécifiez SSL comme valeur poursecurity.protocol.

Ce champ prend en charge les modèles de substitution. Utilisez Secrets Manager pour stocker les informations d'identification requises pour vous connecter à votre courtier Kafka. Pour récupérer la valeur de ce champ, utilisez la fonction get_secret SQL. Pour de plus amples informations sur les modèles de substitution, veuillez consulter Modèles de substitution. Pour plus d'informations sur la fonction get_secret SQL, consultez get_secret (SecreTid, SecretType, clé, roLearn). Utilisez le SecretBinary paramètre.

ssl.keystore.password

Le mot de passe du fichier keystore. Cette valeur est requise si vous spécifiez une valeur pour ssl.keystore.

La valeur de ce champ peut être en texte brut. Ce champ prend également en charge les modèles de substitution. Utilisez Secrets Manager pour stocker les informations d'identification requises pour vous connecter à votre courtier Kafka. Pour récupérer la valeur de ce champ, utilisez la fonction get_secret SQL. Pour de plus amples informations sur les modèles de substitution, veuillez consulter Modèles de substitution. Pour plus d'informations sur la fonction get_secret SQL, consultez get_secret (SecreTid, SecretType, clé, roLearn). Utilisez le SecretString paramètre.

clé ssl.mot de passe

Le mot de passe de la clé privée dans votre fichier keystore.

Ce champ prend en charge les modèles de substitution. Utilisez Secrets Manager pour stocker les informations d'identification requises pour vous connecter à votre courtier Kafka. Pour récupérer la valeur de ce champ, utilisez la fonction get_secret SQL. Pour de plus amples informations sur les modèles de substitution, veuillez consulter Modèles de substitution. Pour plus d'informations sur la fonction get_secret SQL, consultez get_secret (SecreTid, SecretType, clé, roLearn). Utilisez le SecretString paramètre.

sasl.mechanism

Le mécanisme de sécurité utilisé pour se connecter à votre courtier Kafka. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL poursecurity.protocol.

Valeurs valides : PLAIN, SCRAM-SHA-512, GSSAPI.

Note

SCRAM-SHA-512est le seul mécanisme de sécurité pris en charge dans les régions cn-north-1, cn-northwest-1, -1 et -1. us-gov-east us-gov-west

sasl.plain.username

Le nom d'utilisateur utilisé pour récupérer la chaîne secrète depuis Secrets Manager. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL pour security.protocol et PLAIN pour sasl.mechanism.

sasl.plain.password

Mot de passe utilisé pour récupérer la chaîne secrète depuis Secrets Manager. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL pour security.protocol et PLAIN pour sasl.mechanism.

sasl.scram.nom d'utilisateur

Le nom d'utilisateur utilisé pour récupérer la chaîne secrète depuis Secrets Manager. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL pour security.protocol et SCRAM-SHA-512 pour sasl.mechanism.

sasl.scram.password

Mot de passe utilisé pour récupérer la chaîne secrète depuis Secrets Manager. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL pour security.protocol et SCRAM-SHA-512 poursasl.mechanism.

sasl.kerberos.keytab

Le fichier keytab pour l'authentification Kerberos dans Secrets Manager. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL pour security.protocol et GSSAPI poursasl.mechanism.

Ce champ prend en charge les modèles de substitution. Utilisez Secrets Manager pour stocker les informations d'identification requises pour vous connecter à votre courtier Kafka. Pour récupérer la valeur de ce champ, utilisez la fonction get_secret SQL. Pour de plus amples informations sur les modèles de substitution, veuillez consulter Modèles de substitution. Pour plus d'informations sur la fonction get_secret SQL, consultez get_secret (SecreTid, SecretType, clé, roLearn). Utilisez le SecretBinary paramètre.

sasl.kerberos.service.name

Nom principal Kerberos sous lequel Apache Kafka s'exécute. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL pour security.protocol et GSSAPI pour sasl.mechanism.

sasl.kerberos.krb5.kdc

Le nom d'hôte du centre de distribution de clés (KDC) auquel votre client producteur Apache Kafka se connecte. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL pour security.protocol et GSSAPI pour sasl.mechanism.

sasl.kerberos.krb5.realm

Domaine auquel votre client producteur Apache Kafka se connecte. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL pour security.protocol et GSSAPI pour sasl.mechanism.

sasl.kerberos.principal

Identité Kerberos unique à laquelle Kerberos peut attribuer des tickets pour accéder aux services compatibles Kerberos. Cette valeur est obligatoire lorsque vous spécifiez SASL_SSL pour security.protocol et GSSAPI pour sasl.mechanism.

Exemples

L'exemple JSON suivant définit une action Apache Kafka dans une AWS IoT règle. L'exemple suivant transmet la fonction en ligne SourceIP () comme modèle de substitution dans l'en-tête Kafka Action.

{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }

Remarques importantes concernant votre configuration Kerberos

  • Votre centre de distribution de clés (KDC) doit pouvoir être résolu via un système de noms de domaine (DNS) privé au sein de votre VPC cible. Une approche possible consiste à ajouter l'entrée DNS du KDC à une zone hébergée privée. Pour plus d'informations sur cette approche, veuillez consulter Utilisation des zones hébergées privées.

  • Chaque VPC doit avoir la résolution DNS activée. Pour plus d'informations, consultez Utilisation de DNS avec votre VPC.

  • Les groupes de sécurité de l'interface réseau et les groupes de sécurité au niveau de l'instance de destination du VPC doivent autoriser le trafic provenant de votre VPC sur les ports suivants.

    • Trafic TCP sur le port d'écoute du broker bootstrap (souvent 9092, mais doit être compris entre 9000 et 9100)

    • Trafic TCP et UDP sur le port 88 pour le KDC

  • SCRAM-SHA-512est le seul mécanisme de sécurité pris en charge dans les régions cn-north-1, cn-northwest-1, -1 et -1. us-gov-east us-gov-west