开始使用 Amazon Managed Streaming for Apache Kafka 流式摄取
本主题介绍如何通过实体化视图使用来自 Amazon MSK 的流数据。
Amazon Redshift 流式摄取的目的是简化将流式数据直接从流式服务摄取到 Amazon Redshift 或 Amazon Redshift Serverless 的过程。这适用于 Amazon MSK 和 Amazon MSK Serverless 以及 Kinesis。使用 Amazon Redshift 流式摄取时,在将流式数据摄取到 Redshift 之前,无需在 Amazon S3 中暂存 Kinesis Data Streams 流或 Amazon MSK 主题。
在技术层面上,来自 Amazon Kinesis Data Streams 和 Amazon Managed Streaming for Apache Kafka 的流式摄取以低延迟、高速度的方式将流式或主题数据摄取到 Amazon Redshift 实体化视图中。设置完成后,使用实体化视图刷新,可以接收大量数据。
通过执行以下步骤,为 Amazon MSK 设置 Amazon Redshift 流式摄取:
创建映射到流式数据来源的外部 Schema。
创建引用外部 Schema 的实体化视图。
在配置 Amazon Redshift 流式摄取之前,您必须有可用的 Amazon MSK 源。如果您没有源,请按照开始使用 Amazon MSK 中的说明进行操作。
注意
流式摄取和 Amazon Redshift Serverless – 本主题中的配置步骤同时适用于预调配的 Amazon Redshift 集群和 Amazon Redshift Serverless。有关更多信息,请参阅 流式摄取行为和数据类型。
设置 IAM 权限并从 Kafka 执行流式摄取
假设您有可用的 Amazon MSK 集群,第一步是使用 CREATE EXTERNAL SCHEMA
在 Redshift 中定义一个架构并引用 Kafka 主题作为数据来源。之后,要访问主题中的数据,请在实体化视图中定义 STREAM
。您可以用半结构化 SUPER
格式存储来自主题的记录,或者定义一个会将数据转换为 Amazon Redshift 数据类型的 Schema。当您查询实体化视图时,返回的记录是主题的时间点视图。
-
如果您使用 AUTHENTICATION NONE 连接到 MSK,则不需要任何 IAM 角色。但是,如果您使用 AUTHENTICATION IAM,则必须具有应用了相应权限的 IAM 角色。使用允许 Amazon Redshift 集群或 Amazon Redshift Serverless 工作组代入该角色的信任策略创建 IAM 角色。创建之后,角色应已应用了如下例所示的 IAM 策略权限。这些权限支持对 Amazon MSK 集群的 IAM 身份验证。有关如何为 IAM 角色配置信任策略的信息,请参阅授权 Amazon Redshift 代表您访问其他 AWS 服务。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "MSKIAMpolicy", "Effect": "Allow", "Action": [ "kafka-cluster:ReadData", "kafka-cluster:DescribeTopic", "kafka-cluster:Connect" ], "Resource": [ "arn:aws:kafka:*:0123456789:cluster/MyTestCluster/*", "arn:aws:kafka:*:0123456789:topic/MyTestCluster/*" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:*:0123456789:group/MyTestCluster/*" ] } ] }
检查您的 VPC,并确认您的 Amazon Redshift 集群或 Amazon Redshift Serverless 拥有通往 Amazon MSK 集群的路由。Amazon MSK 集群的入站安全组规则应允许 Amazon Redshift 集群或 Amazon Redshift Serverless 工作组的安全组。如果您使用 Amazon MSK,则您指定的端口取决于集群上使用的身份验证方法。有关更多信息,请参阅端口信息和从 AWS 内但在 VPC 外部访问。
请注意,对于流式摄取,不支持通过 mTLS 进行客户端身份验证。有关更多信息,请参阅限制。
下表显示了为从 Amazon MSK 进行流式摄取所要设置的免费配置选项:
Amazon Redshift 配置 Amazon MSK 配置 要在 Redshift 和 Amazon MSK 之间打开的端口 AUTHENTICATION NONE TLS 传输已禁用 9092 AUTHENTICATION NONE TLS 传输已启用 9094 AUTHENTICATION IAM IAM 9098/9198 Amazon Redshift 身份验证是在 CREATE EXTERNAL SCHEMA 语句中设置的。
如果 Amazon MSK 集群启用了相互传输层安全性协议 (mTLS) 身份验证,则将 Amazon Redshift 配置为使用 AUTHENTICATION NONE 会指示它使用端口 9094 进行未经身份验证的访问。但是,由于 mTLS 身份验证正在使用该端口,因此这一过程将失败。因此,当您使用 mTLS 时,我们建议您切换到 AUTHENTICATION IAM。
在 Amazon Redshift 集群或 Amazon Redshift Serverless 工作组中启用增强型 VPC 路由。有关更多信息,请参阅启用增强型 VPC 路由。
注意
为了检索 Amazon MSK 引导代理 URL,Amazon Redshift 使用附加的 IAM 角色提供的权限进行 GetBootstrapBrokers API 调用。请注意,为了使此请求在启用增强型 VPC 路由时获得成功,您的 Amazon Redshift 预调配集群或 Amazon Redshift Serverless 工作组的子网必须具有 NAT 网关或互联网网关。您的网络 ACL 和上述子网的安全组出站规则还必须允许访问 Amazon MSK API 服务端点。有关更多信息,请参阅 Amazon Managed Streaming for Apache Kafka 端点和限额。有关提供引导代理 URI 的更多详细信息将在接下来的步骤中介绍。
-
在 Amazon Redshift 中,创建一个外部 Schema 以映射到 Amazon MSK 集群。该语法如下所示:
CREATE EXTERNAL SCHEMA MySchema FROM MSK IAM_ROLE { default | 'iam-role-arn' } AUTHENTICATION { none | iam } URI 'msk-cluster-uri:port_number' CLUSTER_ARN 'msk-cluster-arn';
要指定从 Amazon MSK 进行流式传输,请使用
FROM MSK
。要进行连接,您需要提供引导代理 URI 和 CLUSTER_ARN。当您创建外部模式时,Amazon MSK 的流式摄取提供以下身份验证类型:无 – 指定没有身份验证步骤。
iam – 指定 IAM 身份验证。选择此选项时,请确保 IAM 角色具有 IAM 身份验证的权限。
以下示例演示在创建外部架构时,如何为 Amazon MSK 集群设置代理 URI 和 CLUSTER_ARN:
CREATE EXTERNAL SCHEMA my_schema FROM MSK IAM_ROLE 'arn:aws:iam::012345678901:role/my_role' AUTHENTICATION none URI 'b-1.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9092,b-2.myTestCluster.123z8u.c2.kafka.us-west-1.amazonaws.com:9092' CLUSTER_ARN 'arn:aws:kafka:us-east-1:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4';
此命令使用连接属性为 Amazon MSK 创建外部架构。请注意以下几点:
在本例中,指定的身份验证类型为 none。
URI 指定了引导代理 URI,端口为 9092。在检索时不需要额外的工具或 API 调用。有关获取 Amazon MSK 集群的引导代理 URI 的详细说明,请参阅《Amazon Managed Streaming for Apache Kafka Developer Guide》中的 Getting the bootstrap brokers for an Amazon MSK cluster。
在本例中,需要使用 CLUSTER_ARN 以及代理 URI。
有关创建外部架构的更多信息,请参阅 CREATE EXTERNAL SCHEMA。
-
创建一个实体化视图以使用来自主题的数据。如果您不想跳过错误记录,请使用类似此示例的 SQL 命令。
CREATE MATERIALIZED VIEW MyView AUTO REFRESH YES AS SELECT * FROM MySchema."mytopic";
Kafka 主题名称区分大小写,可以包含大写字母和小写字母。要从具有大写名称的主题中摄取,可以在数据库级别将配置
enable_case_sensitive_identifier
设置为true
。有关更多信息,请参阅名称和标识符和 enable_case_sensitive_identifier。要开启自动刷新,请使用
AUTO REFRESH YES
。默认行为是手动刷新。元数据列包括以下内容:
元数据列 数据类型 描述 kafka_partition bigint 来自 Kafka 主题的记录的分区 ID kafka_offset bigint Kafka 主题中给定分区的记录的偏移 kafka_timestamp_type char(1) Kafka 记录中使用的时间戳类型:
C – 客户端的记录创建时间 (CREATE_TIME)
L – Kafka 服务器端的记录追加时间 (LOG_APPEND_TIME)
U – 记录创建时间不可用 (NO_TIMESTAMP_TYPE)
kafka_timestamp 不带时区的时间戳 记录的时间戳值 kafka_key varbyte Kafka 记录的键 kafka_value varbyte 从 Kafka 收到的记录 kafka_headers super 从 Kafka 收到的记录的标头 refresh_time 不带时区的时间戳 刷新开始的时间 请务必注意,如果您的实体化视图定义中有业务逻辑,那么在某些情况下,业务逻辑错误可能会导致流式摄取被阻止。这可能会导致您不得不删除实体化视图,然后重新创建。为避免这种情况,我们建议您尽可能简化业务逻辑,并在摄取数据后对数据运行额外的逻辑。
刷新视图,这会调用 Amazon Redshift 从主题中读取数据并将数据加载到实体化视图中。
REFRESH MATERIALIZED VIEW MyView;
在实体化视图中查询数据。
select * from MyView;
当
REFRESH
运行时,直接从主题更新实体化视图。您创建映射到 Kafka 主题数据来源的实体化视图。在实体化视图定义中,您可以对数据执行筛选和聚合。流式摄取实体化视图(基本实体化视图)只能引用一个 Kafka 主题,但是您可以创建额外的实体化视图,以与基本实体化视图和其他实体化视图或表连接使用。
有关流式摄取限制的更多信息,请参阅 流式摄取行为和数据类型。