Apache Kafka - AWS IoT Core

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Apache Kafka

Apache Kafka(Kafka)操作将消息直接发送到您的 Amazon Managed Streaming for Apache Kafka(Amazon MSK)、Confluent Cloud 等第三方提供商托管的 Apache Kafka 集群,或自行管理的 Apache Kafka 集群。通过 Kafka 规则操作,您可以将您的物联网数据路由到 Kafka 集群。这使您能够出于各种目的构建高性能数据管道,例如流分析、数据集成、可视化和任务关键型业务应用程序等。

注意

本主题假设您熟悉 Apache Kafka 平台及相关概念。有关 Apache Kafka 的更多信息,请参阅 Apache Kafka。不支持 MSK Serverless。MSK Serverless 集群只能通过 IAM 身份验证完成,而 Apache Kafka 规则操作目前不支持该身份验证。有关如何使用 Confluent AWS IoT Core 进行配置的更多信息,请参阅利用 Confluent 和AWS解决物联网设备和数据管理挑战。

要求

此规则操作具有以下要求:

  • 一个 IAM 角色,AWS IoT可以代入执行操作ec2:CreateNetworkInterfaceec2:DescribeNetworkInterfacesec2:CreateNetworkInterfacePermissionec2:DeleteNetworkInterfaceec2:DescribeSubnetsec2:DescribeVpcsec2:DescribeVpcAttribute、和ec2:DescribeSecurityGroups操作。此角色创建并管理您的 Amazon Virtual Private Cloud 弹性网络接口,以便联系您的 Kafka 代理。有关更多信息,请参阅 授予AWS IoT规则所需的访问权限

    在AWS IoT控制台中,您可以选择或创建AWS IoT Core允许执行此规则操作的角色。

    有关网络接口的更多信息,请参阅 Amazon EC2 用户指南中的弹性网络接口

    附加到您指定的角色的策略应如以下示例所示:

    { "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • 如果您使用AWS Secrets Manager存储连接到 Kafka 代理所需的证书,则必须创建一个AWS IoT Core可以代入执行secretsmanager:GetSecretValuesecretsmanager:DescribeSecret操作的 IAM 角色。

    附加到您指定的角色的策略应如以下示例所示:

    { "Version":"2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka_keytab-*" ] } ] }
  • 您可以在 Amazon Virtual Private Cloud(Amazon VPC)中运行您的 Apache Kafka 集群。您必须创建 Amazon VPC 目标并在子网中使用 NAT 网关将消息从AWS IoT转发到公有 Kafka 集群。AWS IoT 规则引擎会在 VPC 目标中列出的每个子网中创建一个网络接口,以将流量直接路由到 VPC。当您创建 VPC 目标时,AWS IoT规则引擎会自动创建 VPC 规则操作。有关 VPC 规则操作的更多信息,请参阅 虚拟私有云(VPC)目标

  • 如果您使用客户托管AWS KMS key(KMS 密钥)加密静态数据,则服务必须有权代表呼叫者使用 KMS 密钥。有关更多信息,请参阅 Amazon Managed Streaming for Apache Kafka 开发人员指南中的 Amazon MSK 加密

Parameters

使用此操作创建AWS IoT规则时,必须指定以下信息:

destinationArn

VPC 目标的 Amazon 资源名称(ARN)。有关如何创建 VPC 目标的更多信息,请参阅 虚拟私有云(VPC)目标

topic

要发送到 Kafka 代理的消息的 Kafka 主题。

您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板

键(可选)

Kafka 消息键。

您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板

标头(可选)

您指定的 Kafka 标头的列表。每个标头都是一个键/值对,您可以在创建 Kafka 操作时指定该键值对。您可以使用这些标头将数据从物联网客户端路由到下游 Kafka 集群,而无需修改消息有效载荷。

您可以使用替代模板替换此字段。要了解如何在 Kafka 操作的标头中将内联规则的函数作为替换模板传递,请参阅示例。有关更多信息,请参阅 替换模板

注意

不支持二进制格式的标头。

分区(可选)

Kafka 消息分区。

您可以使用替代模板替换此字段。有关更多信息,请参阅 替换模板

clientProperties

定义 Apache Kafka 生成器客户端属性的对象。

acks(可选)

生成器在考虑请求完成之前要求服务器收到的确认数。

如果指定 0 作为值,则生产者将不会等待来自服务器的任何确认。如果服务器没有收到该消息,则生成器将不会重试发送该消息。

有效值:-101all。默认值为 1

bootstrap.servers

主机和端口对列表(例如 host1:port1host2:port2)用于建立到 Kafka 集群的初始连接。

compression.type(可选)

生成器生成的所有数据的压缩类型。

有效值:nonegzipsnappylz4zstd。默认值为 none

security.protocol

用于连接到您的 Kafka 代理的安全协议。

有效值:SSLSASL_SSL。默认值为 SSL

key.serializer

指定如何将您使用 ProducerRecord 提供的键对象转换为字节。

有效值:StringSerializer

value.serializer

指定如何将您使用 ProducerRecord 提供的值对象转换为字节。

有效值:ByteBufferSerializer

ssl.truststore

base64 格式的信任库文件或位于 AWS Secrets Manager 的信任库文件位置。如果您的信任存储受到 Amazon 证书授权机构(CA)的认证,则不需要此值。

此字段支持替换模板。如果使用 Secrets Manager 存储连接到 Kafka 代理所需的凭证,则可以使用 get_secret SQL 函数检索此字段的值。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。如果信任库采用文件的形式,请使用 SecretBinary 参数。如果信任库采用字符串的形式,请使用 SecretString 参数。

此值最大为 65 KB。

ssl.truststore.password

信任库存储的密码。仅当您为信任库创建了密码时,才需要此值。

ssl.keystore

密钥库文件。当您指定 SSL 作为 security.protocol 的值时,才需要此值。

此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用 get_secret SQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用 SecretBinary 参数。

ssl.keystore.password

密钥库文件存储的密码。如果为 ssl.keystore 指定了值,则需要此值。

此字段的值可以是纯文本。此字段还支持替代模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用 get_secret SQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用 SecretString 参数。

ssl.key.password

密钥库文件中私有密钥的密码。

此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用 get_secret SQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用 SecretString 参数。

sasl.mechanism

用于连接到您的 Kafka 代理的安全机制。当您为 security.protocol 指定 SASL_SSL 时,则需要此值。

有效值:PLAINSCRAM-SHA-512GSSAPI

注意

SCRAM-SHA-512是 cn-north-1、cn-northwest-1、-1 和 -1 区域中唯一支持的安全机制。 us-gov-east us-gov-west

sasl.plain.username

用于从 Secrets Manager 中检索密钥字符串的用户名。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 PLAIN 时,则需要此值。

sasl.plain.password

用于从 Secrets Manager 中检索密钥字符串的密码。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 PLAIN 时,则需要此值。

sasl.scram.username

用于从 Secrets Manager 中检索密钥字符串的用户名。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 SCRAM-SHA-512 时,则需要此值。

sasl.scram.password

用于从 Secrets Manager 中检索密钥字符串的密码。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 SCRAM-SHA-512 时,则需要此值。

sasl.kerberos.keytab

Secrets Manager 中用于 Kerberos 身份验证的 keytab 文件。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

此字段支持替换模板。使用 Secrets Manager 来存储连接到您的 Kafka 代理所需的凭证。要检索此字段的值,请使用 get_secret SQL 函数。有关替换模板的更多信息,请参阅 替换模板。有关 get_secret SQL 函数的更多信息,请参阅get_secret(secretId, secretType, key, roleArn)。使用 SecretBinary 参数。

sasl.kerberos.service.name

Apache Kafka 运行的 Kerberos 主要名称。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

sasl.kerberos.krb5.kdc

您的 Apache Kafka 生成器客户端连接到的密钥分配中心(KDC)的主机名。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

sasl.kerberos.krb5.realm

您的 Apache Kafka 生成器客户端连接到的领域。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

sasl.kerberos.principal

Kerberos 可以为其分配票证以访问 Kerberos 感知服务的唯一 Kerberos 身份。当您为 security.protocol 指定 SASL_SSL、为 sasl.mechanism 指定 GSSAPI 时,则需要此值。

示例

以下 JSON 示例在规则中定义了 Apache Kafka 操作。AWS IoT以下示例将 sourceIp() 内联函数作为替换模板传递到 Kafka 操作标头中。

{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }

有关 Kerberos 设置的重要注意事项

  • 您的密钥分配中心(KDC)必须通过目标 VPC 内的私有域名系统(DNS)进行解析。一种可能的方法是将 KDC DNS 条目添加到私有托管区域。有关此方法的更多信息,请参阅如何使用私有托管区域

  • 每个 VPC 都必须启用 DNS 解析。有关更多信息,请参阅将 DNS 与您的 VPC 一起使用

  • VPC 目标中的网络接口安全组和实例级安全组必须允许来自 VPC 内部以下端口的流量。

    • 引导代理侦听器端口上的 TCP 流量(通常为 9092,但必须在 9000–9100 范围内)

    • KDC 端口 88 上的 TCP 和 UDP 流量

  • SCRAM-SHA-512是 cn-north-1、cn-northwest-1、-1 和 -1 区域中唯一支持的安全机制。 us-gov-east us-gov-west