向 Firehose 直播发送数据 - Amazon Data Firehose

将 Amazon Data Firehose 流传输到亚马逊 S3 中的 Apache Iceberg Tables 处于预览阶段,可能会发生变化。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

向 Firehose 直播发送数据

本节介绍如何使用不同的数据源将数据发送到您的 Firehose 流。如果您不熟悉 Amazon Data Firehose,请花点时间熟悉中介绍的概念和术语。什么是亚马逊 Data Firehose?

注意

某些 AWS 服务只能向位于同一区域的 Firehose 直播发送消息和事件。如果在为亚马逊 CloudWatch 日志、 CloudWatch 事件或配置目标时,您的 Firehose 直播未作为选项出现 AWS IoT,请验证您的 Firehose 直播是否与其他服务位于同一区域。

您可以从以下数据源向 Firehose 直播发送数据。

Kinesis Data Streams

以下步骤可帮助您配置 Amazon Kinesis Data Streams 以向 Firehose 流发送信息。

重要

如果您使用 Kinesis Producer 库 (KPL) 将数据写入 Kinesis 数据流,则可以使用聚合来合并写入该 Kinesis 数据流的记录。如果您随后使用该数据流作为 Firehose 流的来源,Amazon Data Firehose 会在将记录传送到目标之前对其进行解聚处理。如果您将 Firehose 流配置为转换数据,Amazon Data Firehose 会在将记录传送到之前对其进行解聚处理。 AWS Lambda有关更多信息,请参阅使用 Kinesis 创建者库开发 Amazon Kinesis Data Streams 创建者聚合

  1. 登录 AWS Management Console 并打开 Amazon Data Firehose 控制台,网址为。https://console.aws.amazon.com/firehose/

  2. 选择创建 Firehose 直播

  3. 在 “名称和来源” 页面上,为以下字段提供值。

    Firehose 直播名称

    你的 Firehose 直播的名称。

    选择 Kinesis 流来配置使用 Kinesis 数据流作为数据源的 Firehose 流。然后,您可以使用 Amazon Data Firehose 轻松地从现有数据流中读取数据并将其加载到目的地。

    要使用 Kinesis 数据流作为源,请在 Kinesis 流列表中选择一个现有流,或选择新建以创建新的 Kinesis 数据流。创建新流后,选择刷新,以更新 Kinesis 流列表。如果您有大量的流,可使用 Filter by name 筛选列表。

    注意

    当您将 Kinesis 数据流配置为 Firehose 流的源时,亚马逊数据 Firehose 和操作将被禁用。PutRecord PutRecordBatch在这种情况下,要将数据添加到你的 Firehose 流中,请使用 Kinesis Data Streams 和操作。PutRecord PutRecords

    Amazon Data Firehose 开始从你的 Kinesis LATEST 直播的位置读取数据。有关 Kinesis Data Streams 位置的更多信息,GetShardIterator请参阅。

    Amazon Data Firehose 每秒为每个分片调用 Kinesis Data Streams GetRecords操作一次。但是,启用完整备份后,Firehose 会每秒对每个分片调用 Kinesis Data Streams GetRecords 操作两次,一次用于主传输目标,另一次用于完整备份。

    可以从同一 Kinesis 直播中读取多个 Firehose 直播。其他 Kinesis 应用程序(使用者)也可以从同一个流中读取。来自任何 Firehose 直播或其他消费者应用程序的每次调用都计入分片的总体限制限制。为了避免受限,请小心计划您的应用程序。有关 Kinesis Data Streams 限制的更多信息,请参阅 Amazon Kinesis Streams 限制

  4. 选择 Next 前进至配置记录转换和格式转换页面。

Amazon MSK

您可以将亚马逊配置MSK为向 Firehose 直播发送信息。

  1. 登录 AWS Management Console 并打开 Amazon Data Firehose 控制台,网址为。https://console.aws.amazon.com/firehose/

  2. 选择创建 Firehose 直播

    在本页面的选择源和目标部分,为以下字段提供值:

    选择亚马逊MSK来配置使用亚马逊MSK作为数据源的 Firehose 流。您可以在MSK预配置集群和MSK无服务器集群之间进行选择。然后,您可以使用 Firehose 轻松地从特定的 Amazon MSK 集群和主题中读取数据,然后将其加载到指定的 S3 目标中。

    目标位置

    选择亚马逊 S3 作为 Firehose 直播的目的地。

    在本页面的源设置部分,为以下字段提供值:

    亚马逊MSK集群连接

    根据您的集群配置,选择私有引导代理(推荐)或公有引导代理选项。引导代理是 Apache Kafka 客户端用来连接集群的起点。公共引导代理旨在从外部公开访问 AWS,而私有引导代理则用于从内部访问。 AWS有关亚马逊的更多信息MSK,请参阅适用于 A pache Kafka 的亚马逊托管流媒体 Kafka

    要通过私有引导代理连接到预配置或无服务器 Amazon MSK 集群,该集群必须满足以下所有要求。

    • 集群必须处于活动状态。

    • 集群必须将访问控制方法之一IAM作为其访问控制方法。

    • 必须为IAM访问控制方法启用多VPC私有连接。

    • 您必须向该集群添加基于资源的策略,该策略授予 Firehose 服务主体调用 Amazon MSK CreateVpcConnection API 操作的权限。

    要通过公共引导代理连接到预配置的 Amazon 集MSK群,该集群必须满足以下所有要求。

    • 集群必须处于活动状态。

    • 集群必须将访问控制方法之一IAM作为其访问控制方法。

    • 集群必须可公开访问。

    MSK集群账户

    您可以选择 Amazon MSK 集群所在的账户。这可以是以下之一。

    • 活期账户-允许您从当前账户中的集MSK群中提取数据。 AWS 为此,您必须指定 Firehose 流将从中读取数据的亚马逊MSK集群。ARN

    • 跨账户-允许您在另一个 AWS 账户中从MSK集群中提取数据。有关更多信息,请参阅 来自亚马逊的跨账户配送 MSK

    主题

    指定你希望 Firehose 直播从中提取数据的 Apache Kafka 主题。Firehose 直播创建完成后,您将无法更新此主题。

    在页面的 Firehose 直播名称部分,为以下字段提供值:

    Firehose 直播名称

    为你的 Firehose 直播指定名称。

  3. 接下来,您可以完成配置记录转换和记录格式转换的可选步骤。有关更多信息,请参阅 配置记录转换和格式转换

Kinesis Agent

Amazon Kinesis 代理是一款独立的 Java 软件应用程序,可作为参考实现,展示如何收集数据并将其发送到 Firehose。代理会持续监控一组文件,并将新数据发送到您的 Firehose 直播中。该代理显示了如何处理文件轮换、检查点操作和失败时重试。它展示了如何以可靠、及时和简单的方式交付数据。它还显示了如何发布 CloudWatch指标以更好地监控流媒体过程并对其进行故障排除。要了解更多信息,请访问 awslabs/ amazon-kinesis-agent

默认情况下,会基于换行符 ('\n') 分析每个文件中的记录。但是,也可以将代理配置为分析多行记录(请参阅 代理配置设置)。

您可以在基于 Linux 的服务器环境(如 Web 服务器、日志服务器和数据库服务器)上安装此代理。安装代理后,通过指定要监控的文件和数据的 Firehose 流来对其进行配置。配置代理后,它会持久地从文件中收集数据,并可靠地将其发送到 Firehose 数据流。

主题

    使用以下方法之一管理您的 AWS 证书:

    • 创建自定义凭证提供程序。有关更多信息,请参阅 创建自定义凭证提供商

    • 在启动EC2实例时指定一个IAM角色。

    • 在配置代理时指定 AWS 凭据(参见下方配置表awsSecretAccessKey中的awsAccessKeyId和条目代理配置设置)。

    • 编辑/etc/sysconfig/aws-kinesis-agent以指定您的 AWS 地区和 AWS 访问密钥。

    • 如果您的EC2实例位于其他 AWS 账户中,请创建一个IAM角色以提供对 Amazon Data Firehose 服务的访问权限。在配置代理时指定该角色(参见assumeRoleARNassumeRoleExternalID)。使用前面的方法之一来指定另一个账户中有权担任此角色的用户的 AWS 证书。

    您可以创建自定义凭证提供程序,并在以下配置设置中为 Kinesis 代理提供其类名和 jar 路径:userDefinedCredentialsProvider.classnameuserDefinedCredentialsProvider.location。有关这两个配置设置的说明,请参阅代理配置设置

    要创建自定义凭证提供程序,请定义一个实现 AWS CredentialsProvider 接口的类,如下例所示。

    import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; public class YourClassName implements AWSCredentialsProvider { public YourClassName() { } public AWSCredentials getCredentials() { return new BasicAWSCredentials("key1", "key2"); } public void refresh() { } }

    您的类必须有一个不带参数的构造函数。

    AWS 定期调用刷新方法以获取更新的凭证。如果希望凭证提供程序在其整个生命周期内提供不同的凭证,请在此方法中包含用于刷新凭证的代码。或者,如果您需要提供静态(不更改)凭证的凭证提供程序,则可以将此方法保留为空。

    首先,连接到您的实例。有关更多信息,请参阅 Amazon EC2 用户指南中的 Connect 到您的实例。如果您在连接时遇到问题,请参阅 Amazon EC2 用户指南中的实例连接疑难解答

    接下来,请使用以下方法之一安装代理。

    • 要从 Amazon Linux 存储库设置代理

      此方法仅适用于 Amazon Linux 实例。使用以下命令:

      sudo yum install –y aws-kinesis-agent

      Agent v 2.0.0 或更高版本安装在装有 Amazon Linux 2 () AL2 操作系统的计算机上。此代理版本需要安装 Java 1.8 或更高版本。如果尚未安装所需的 Java 版本,代理安装程序将会安装。有关亚马逊 Linux 2 的更多信息,请参阅https://aws.amazon.com/amazon-linux-2/

    • 从 Amazon S3 存储库设置代理

      此方法从公开可用的存储库安装代理,因此适用于 Red Hat Enterprise Linux 以及 Amazon Linux 2 实例。使用以下命令下载并安装最新版本的代理 2.x.x:

      sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm

      要安装特定版本的代理,请在命令中指定版本号。例如,以下命令将安装代理 2.0.1。

      sudo yum install –y https://streaming-data-agent.s3.amazonaws.com/aws-kinesis-agent-2.0.1-1.amzn1.noarch.rpm

      如果您使用的是 Java 1.7,但不想升级,则可以下载与 Java 1.7 兼容的代理 1.x.x。例如,要下载代理 1.1.6,可使用以下命令:

      sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-1.1.6-1.amzn1.noarch.rpm

      可以使用以下命令下载最新的代理 1.x.x:

      sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn1.noarch.rpm
    • 从 GitHub repo 中设置代理

      1. 首先,确保您已安装所需的 Java 版本,具体取决于代理版本。

      2. awslabs amazon-kinesis-agent GitHub /存储库下载代理。

      3. 导航到下载目录并运行以下命令来安装代理:

        sudo ./setup --install
    • 在 Docker 容器中设置代理

      Kinesis 代理可以在容器中运行,也可以通过 amazonlinux 容器库运行。使用以下 Dockerfile,然后运行 docker build

      FROM amazonlinux RUN yum install -y aws-kinesis-agent which findutils COPY agent.json /etc/aws-kinesis/agent.json CMD ["start-aws-kinesis-agent"]
    配置并启动代理
    1. 打开并编辑配置文件(如果使用默认文件访问权限,则以超级用户的身份来执行):/etc/aws-kinesis/agent.json

      在此配置文件中,指定代理从中收集数据的文件 ("filePattern") 以及代理向其发送数据的 Firehose 流的名称 ("deliveryStream")。文件名是一种模式,并且代理能够识别文件轮换。每秒内您轮换使用文件或创建新文件的次数不能超过一次。代理使用文件创建时间戳来确定要跟踪哪些文件并将其追踪到您的 Firehose 直播中。如果每秒创建新文件或轮换使用文件的次数超过一次,代理将无法正确区分这些文件。

      { "flows": [ { "filePattern": "/tmp/app.log*", "deliveryStream": "yourdeliverystream" } ] }

      默认 AWS 区域为us-east-1。如果使用的是其他区域,请将 firehose.endpoint 设置添加到配置文件,为区域指定终端节点。有关更多信息,请参阅 代理配置设置

    2. 手动启动代理:

      sudo service aws-kinesis-agent start
    3. (可选)将代理配置为在系统启动时启动:

      sudo chkconfig aws-kinesis-agent on

    现在,代理作为系统服务在后台运行。它会持续监视指定的文件并将数据发送到指定的 Firehose 流。代理活动记录在 /var/log/aws-kinesis-agent/aws-kinesis-agent.log 中。

    代理支持两个必需的配置设置,即 filePatterndeliveryStream,以及可用于其他功能的可选配置设置。您可以在 /etc/aws-kinesis/agent.json 中指定必需配置设置和可选配置设置。

    只要您更改配置文件,就必须使用以下命令停止再启动代理:

    sudo service aws-kinesis-agent stop sudo service aws-kinesis-agent start

    或者,您也可以使用以下命令:

    sudo service aws-kinesis-agent restart

    一般的设置配置如下。

    配置设置 描述
    assumeRoleARN

    要由用户担任的角色的 Amazon 资源名称 (ARN)。有关更多信息,请参阅《IAM用户指南》中的使用IAM角色委派跨 AWS 账户访问权限

    assumeRoleExternalId

    确定谁可以担任该角色的可选标识符。有关更多信息,请参阅《IAM用户指南》中的 “如何使用外部 ID”。

    awsAccessKeyId

    AWS 覆盖默认凭证的访问密钥 ID。此设置优先于所有其他凭证提供程序。

    awsSecretAccessKey

    AWS 覆盖默认凭证的密钥。此设置优先于所有其他凭证提供程序。

    cloudwatch.emitMetrics

    CloudWatch 如果已设置,则允许代理向其发送指标 (true)。

    默认:True

    cloudwatch.endpoint

    的区域终端节点 CloudWatch。

    默认:monitoring.us-east-1.amazonaws.com

    firehose.endpoint

    亚马逊 Data Firehose 的区域终端节点。

    默认:firehose.us-east-1.amazonaws.com

    sts.endpoint

    AWS 安全令牌服务的区域终端节点。

    默认:https://sts.amazonaws.com

    userDefinedCredentialsProvider.classname 如果定义自定义凭证提供程序,请使用此设置提供其完全限定类名。不要在类名末尾包含 .class
    userDefinedCredentialsProvider.location 如果定义自定义凭证提供程序,请使用此设置指定包含自定义凭证提供程序的 jar 的绝对路径。代理还在以下位置查找 jar 文件:/usr/share/aws-kinesis-agent/lib/

    流配置设置如下。

    配置设置 描述
    aggregatedRecordSizeBytes

    要使代理聚合记录,然后通过一次操作将其放入 Firehose 流,请指定此设置。将其设置为你希望聚合记录在代理将其放入 Firehose 直播之前的大小。

    默认值:0(不聚合)

    dataProcessingOptions

    在每条已解析的记录发送到 Firehose 流之前,应用于该记录的处理选项列表。这些处理选项按指定的顺序执行。有关更多信息,请参阅 使用代理预处理数据

    deliveryStream

    [必填] Firehose 直播的名称。

    filePattern

    [必需] 需要由代理监控的文件的 Glob。与此模式匹配的任何文件都会自动由代理挑选出来并进行监控。对于匹配此模式的所有文件,请向 aws-kinesis-agent-user 授予读取权限。对于包含这些文件的目录,请向 aws-kinesis-agent-user 授予读取和执行权限。

    重要

    代理将选择与此模式匹配的任何文件。为了确保代理不会选择意外的记录,请仔细选择此模式。

    initialPosition

    开始解析文件的初始位置。有效值为 START_OF_FILEEND_OF_FILE

    默认:END_OF_FILE

    maxBufferAgeMillis

    代理在将数据发送到 Firehose 流之前缓冲数据的最长时间,以毫秒为单位。

    值范围:1,000-900,000(1 秒到 15 分钟)

    默认值:60,000(1 分钟)

    maxBufferSizeBytes

    代理在将数据发送到 Firehose 流之前对其进行缓冲的最大大小,以字节为单位。

    值范围:1-4,194,304(4MB)

    默认值:4194304 (4 MB)

    maxBufferSizeRecords

    代理在将数据发送到 Firehose 流之前对其进行缓冲的最大记录数。

    值范围:1-500

    默认值:500

    minTimeBetweenFilePollsMillis

    代理轮询和分析受监控文件中是否有新数据的时间间隔(以毫秒计)。

    值范围:1 或更大值

    默认值:100

    multiLineStartPattern

    用于标识记录开始的模式。记录由与模式匹配的行以及与模式不匹配的任何以下行组成。有效值为正则表达式。默认情况下,日志文件中的每一个新行都被解析为一条记录。

    skipHeaderLines

    代理从受监控文件开头跳过分析的行数。

    值范围:0 或更大值

    默认值:0(零)

    truncatedRecordTerminator

    当记录大小超过 Amazon Data Firehose 记录大小限制时,代理用来截断已解析记录的字符串。(1000 KB)

    默认值:'\n'(换行符)

    通过指定多个流程配置设置,您可以配置代理以监控多个文件目录并将数据发送到多个流。在以下配置示例中,代理监控两个文件目录,并将数据分别发送到 Kinesis 数据流和 Firehose 流。您可以为 Kinesis Data Streams 和 Amazon Data Firehose 指定不同的终端节点,这样您的数据流和 Firehose 流就不必位于同一个区域。

    { "cloudwatch.emitMetrics": true, "kinesis.endpoint": "https://your/kinesis/endpoint", "firehose.endpoint": "https://your/firehose/endpoint", "flows": [ { "filePattern": "/tmp/app1.log*", "kinesisStream": "yourkinesisstream" }, { "filePattern": "/tmp/app2.log*", "deliveryStream": "yourfirehosedeliverystream" } ] }

    有关在 Amazon Kinesis Data Streams 中使用代理的更多详细信息,请参阅使用 Kinesis 代理写入 Amazon Kinesis Data Streams

    代理可以预处理从受监控文件中解析的记录,然后再将其发送到您的 Firehose 直播中。通过将 dataProcessingOptions 配置设置添加到您的文件流可以启用此功能。可以添加一个或多个处理选项,这些选项将按指定的顺序执行。

    代理支持以下处理选项。由于代理是开源的,您可以进一步开发和扩展其处理选项。您可以从 Kinesis 代理下载代理。

    处理选项
    SINGLELINE

    通过移除换行符和首尾的空格,将多行记录转换为单行记录。

    { "optionName": "SINGLELINE" }
    CSVTOJSON

    将记录从分隔符分隔的格式转换为格式。JSON

    { "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", ... ], "delimiter": "yourdelimiter" }
    customFieldNames

    [必填] 在每个JSON键值对中用作键的字段名称。例如,如果您指定 ["f1", "f2"],则记录“v1, v2”将转换为 {"f1":"v1","f2":"v2"}

    delimiter

    在记录中用作分隔符的字符串。默认值为逗号(,)。

    LOGTOJSON

    将记录从日志格式转换为JSON格式。支持的日志格式为 Apache Common LogApache Combined LogApache Error LogRFC3164 Syslog

    { "optionName": "LOGTOJSON", "logFormat": "logformat", "matchPattern": "yourregexpattern", "customFieldNames": [ "field1", "field2", ] }
    logFormat

    [必需] 日志条目格式。以下是可能的值:

    • COMMONAPACHELOG – Apache 通用日志格式。默认情况下每个日志条目都为以下模式:“%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}”。

    • COMBINEDAPACHELOG – Apache 组合日志格式。默认情况下每个日志条目都为以下模式:“%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}”。

    • APACHEERRORLOG – Apache 错误日志格式。默认情况下每个日志条目都为以下模式:“[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}”。

    • SYSLOG— RFC3164 系统日志格式。默认情况下每个日志条目都为以下模式:“%{timestamp} %{hostname} %{program}[%{processid}]: %{message}”。

    matchPattern

    覆盖指定的日志格式的默认模式。如果日志条目使用自定义格式,则可以使用该设置提取日志条目中的值。如果指定 matchPattern,还必须指定 customFieldNames

    customFieldNames

    在每个JSON键值对中用作键的自定义字段名称。您可以使用此设置定义从 matchPattern 中提取的值的字段名称,或覆盖预定义日志格式的默认字段名称。

    例 : LOGTOJSON 配置

    以下是将 Apache Common Log 条目转换为JSON格式的LOGTOJSON配置示例:

    { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" }

    转换前:

    64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291

    转换后:

    {"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
    例 : 使用自定义字段进行LOGTOJSON配置

    下面是 LOGTOJSON 配置的另一个示例:

    { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"] }

    使用此配置设置,将上一个示例中的相同 Apache Common Log 条目转换为如下JSON格式:

    {"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
    例 :转换 Apache 通用日志条目

    以下流程配置将 Apache Common Log 条目转换为格式的单行记录:JSON

    { "flows": [ { "filePattern": "/tmp/app.log*", "deliveryStream": "my-delivery-stream", "dataProcessingOptions": [ { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" } ] } ] }
    例 :转换多行记录

    以下流配置分析第一行以“[SEQUENCE=”开头的多行记录。每个记录先转换为一个单行记录。然后,将基于制表分隔符从记录中提取值。提取的值将映射到指定的customFieldNames值,以形成JSON格式的单行记录。

    { "flows": [ { "filePattern": "/tmp/app.log*", "deliveryStream": "my-delivery-stream", "multiLineStartPattern": "\\[SEQUENCE=", "dataProcessingOptions": [ { "optionName": "SINGLELINE" }, { "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", "field3" ], "delimiter": "\\t" } ] } ] }
    例 : 使用匹配模式的LOGTOJSON配置

    以下是将 Apache Common Log 条目转换为JSON格式的LOGTOJSON配置示例,其中省略了最后一个字段(字节):

    { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})", "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"] }

    转换前:

    123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200

    转换后:

    {"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}

    系统启动时自动启动代理。

    sudo chkconfig aws-kinesis-agent on

    检查代理的状态:

    sudo service aws-kinesis-agent status

    停止代理:

    sudo service aws-kinesis-agent stop

    从此位置读取代理的日志文件:

    /var/log/aws-kinesis-agent/aws-kinesis-agent.log

    卸载代理:

    sudo yum remove aws-kinesis-agent

    有没有适用于 Windows 的 Kinesis 代理?

    适用于 Windows 的 Kinesis 代理与适用于 Linux 平台的 Kinesis 代理是不同的软件。

    为什么 Kinesis 代理速度变慢且/或 RecordSendErrors 增加?

    这通常是由于 Kinesis 节流造成的。查看 Kinesis Data Streams 的WriteProvisionedThroughputExceeded指标或 Firehose 直播ThrottledRecords的指标。这些指标从 0 开始的任何增加都表示需要增加流限制。有关更多信息,请参阅 Kinesis 数据流限制和 Firehose 数据流

    排除节流后,查看 Kinesis 代理是否配置为跟踪大量小文件。Kinesis 代理跟踪新文件时会有延迟,因此 Kinesis 代理应跟踪少量大文件。尝试将您的日志文件合并为大文件。

    为什么我会收到 java.lang.OutOfMemoryError 异常?

    Kinesis 代理没有足够的内存来处理当前的工作负载。尝试增加 /usr/bin/start-aws-kinesis-agent 中的 JAVA_START_HEAPJAVA_MAX_HEAP,并重新启动代理。

    为什么我会收到 IllegalStateException : connection pool shut down 异常?

    Kinesis 代理没有足够的连接来处理当前的工作负载。尝试在 /etc/aws-kinesis/agent.json 的常规代理配置设置中增加 maxConnectionsmaxSendingThreads。这些字段的默认值是可用运行时系统处理器的 12 倍。有关高级代理配置设置的更多信息,请参见 AgentConfiguration.java

    如何调试 Kinesis 代理的其他问题?

    可在 /etc/aws-kinesis/log4j.xml 中启用 DEBUG 级别日志。

    我应该如何配置 Kinesis 代理?

    maxBufferSizeBytes 越小,Kinesis 代理发送数据的频率就越高。这可能是好事,因为这减少了记录的传输时间,但也增加了每秒对 Kinesis 的请求。

    为什么 Kinesis 代理会发送重复记录?

    这是由于文件跟踪中的错误配置造成的。确保每个 fileFlow’s filePattern 只匹配一个文件。如果正在使用的 logrotate 模式处于 copytruncate 模式,也可能发生这种情况。尝试将模式更改为默认模式或创建模式以避免重复。有关处理重复记录的更多信息,请参阅处理重复记录

    AWS SDK

    您可以使用适用于 Java 的 Amazon Data Firehos e 将数据发送API到 Firehose 流。AWS SDK NETNode.jsPythonRuby。如果您不熟悉 Amazon Data Firehose,请花点时间熟悉中介绍的概念和术语。什么是亚马逊 Data Firehose?有关更多信息,请参阅开始使用 Amazon Web Services 开发

    这些示例并非可直接用于生产的代码,因为它们不会检查所有可能的异常,或者不会考虑到所有可能的安全或性能问题。

    Amazon Data Fire API hose 提供了两种向你的 Firehose 流发送数据的操作:和。PutRecordPutRecordBatch PutRecord()在一个呼叫中发送一条数据记录,并且PutRecordBatch()可以在一个呼叫中发送多条数据记录。

    使用单次写入操作 PutRecord

    放置数据只需要 Firehose 流名称和字节缓冲区 (<=1000 KB)。由于 Amazon Data Firehose 在将文件加载到 Amazon S3 之前会对多条记录进行批处理,因此您可能需要添加记录分隔符。要将数据逐条放入 Firehose 流中,请使用以下代码:

    PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setDeliveryStreamName(deliveryStreamName); String data = line + "\n"; Record record = new Record().withData(ByteBuffer.wrap(data.getBytes())); putRecordRequest.setRecord(record); // Put record into the DeliveryStream firehoseClient.putRecord(putRecordRequest);

    有关更多代码上下文,请参阅中包含的示例代码 AWS SDK。有关请求和响应语法的信息,请参阅 Firehose API 操作中的相关主题。

    使用 Batch 写入操作 PutRecordBatch

    放置数据只需要提供 Firehose 直播名称和记录列表。由于 Amazon Data Firehose 在将文件加载到 Amazon S3 之前会对多条记录进行批处理,因此您可能需要添加记录分隔符。要将数据记录批量放入 Firehose 流,请使用以下代码:

    PutRecordBatchRequest putRecordBatchRequest = new PutRecordBatchRequest(); putRecordBatchRequest.setDeliveryStreamName(deliveryStreamName); putRecordBatchRequest.setRecords(recordList); // Put Record Batch records. Max No.Of Records we can put in a // single put record batch request is 500 firehoseClient.putRecordBatch(putRecordBatchRequest); recordList.clear();

    有关更多代码上下文,请参阅中包含的示例代码 AWS SDK。有关请求和响应语法的信息,请参阅 Firehose API 操作中的相关主题。

    CloudWatch 日志

    CloudWatch 可以使用 CloudWatch 订阅过滤器将日志事件发送到 Firehose。有关更多信息,请参阅使用 Amazon Data Firehose 的订阅筛选条件

    CloudWatch 日志事件以压缩的 gzip 格式发送到 Firehose。如果您想将解压缩后的日志事件传送到 Firehose 目标,可以使用 Firehose 中的解压缩功能自动解压缩日志。 CloudWatch

    重要

    目前,Firehose 不支持将 CloudWatch 日志传送到亚马逊 OpenSearch 服务目标,因为亚马逊 CloudWatch 将多个日志事件合并到一个 Firehose 记录中,而亚马逊 OpenSearch 服务无法在一条记录中接受多个日志事件。作为替代方案,您可以考虑在 CloudWatch 日志中使用亚马逊 OpenSearch 服务的订阅筛选条件

    解压日志 CloudWatch

    如果您使用 Firehose 传送 CloudWatch 日志,并希望将解压缩后的数据传输到 Firehose 直播目标,请使用 Firehose 数据格式转换 (Parquet) 或动态分区。ORC您必须为 Firehose 直播启用解压功能。

    您可以使用 AWS Management Console、 AWS Command Line Interface 或 AWS SDKs启用解压功能。

    注意

    如果您在直播上启用解压缩功能,请将该直播专门用于 CloudWatch 日志订阅过滤器,而不是用于Vended Logs。如果您在用于同时采集日志和已售 CloudWatch 日志的流上启用解压缩功能,则向Firehose提取Vended Logs将失败。此解压缩功能仅适用于 CloudWatch 日志。

    解压日志后的消息提取 CloudWatch

    启用解压缩功能后,您还可以选择启用消息提取。使用消息提取时,Firehose 会从解压缩的 CloudWatch 日志记录中筛选出所有元数据,例如所有者、日志组、日志流和其他元数据,并仅提供消息字段内的内容。如果您要将数据传送到 Splunk 目标,则必须开启消息提取功能,Splunk 才能解析数据。以下是解压缩后的输出示例,无论是否使用消息提取。

    图 1:未提取消息的解压缩后的输出示例:

    { "owner": "111111111111", "logGroup": "CloudTrail/logs", "logStream": "111111111111_CloudTrail/logs_us-east-1", "subscriptionFilters": [ "Destination" ], "messageType": "DATA_MESSAGE", "logEvents": [ { "id": "31953106606966983378809025079804211143289615424298221568", "timestamp": 1432826855000, "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root1\"}" }, { "id": "31953106606966983378809025079804211143289615424298221569", "timestamp": 1432826855000, "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root2\"}" }, { "id": "31953106606966983378809025079804211143289615424298221570", "timestamp": 1432826855000, "message": "{\"eventVersion\":\"1.03\",\"userIdentity\":{\"type\":\"Root3\"}" } ] }

    图 2:使用消息提取进行解压缩后的输出示例:

    {"eventVersion":"1.03","userIdentity":{"type":"Root1"} {"eventVersion":"1.03","userIdentity":{"type":"Root2"} {"eventVersion":"1.03","userIdentity":{"type":"Root3"}

    启用和禁用解压缩

    您可以使用、 AWS Command Line Interface 或 AWS SDKs启用和禁用解压缩。 AWS Management Console

    通过控制台对新的 Firehose 直播启用解压缩

    要对新的 Firehose 直播启用解压功能,请使用 AWS Management Console
    1. 登录 AWS Management Console 并打开 Kinesis 控制台,网址为 https://console.aws.amazon.com/kinesis。

    2. 在导航窗格中选择 Amazon Data Firehose

    3. 选择创建 Firehose 直播

    4. “选择来源和目的地” 下

      你的 Firehose 直播的来源。选择以下来源之一:

      • 直接 PUT — 选择此选项可创建制作者应用程序直接写入的 Firehose 流。有关在 Fire AWS hose PUT 中与 Direct 集成的服务、代理和开源服务的列表,请参阅节。

      • Kinesis 流:选择此选项可配置使用 Kinesis 数据流作为数据源的 Firehose 流。然后,您可以使用 Firehose 轻松地从现有 Kinesis 数据流中读取数据并将其加载到目的地。有关更多信息,请参阅使用 Kinesis Data Streams 写入 Firehose

      目标位置

      你的 Firehose 直播的目的地。选择以下操作之一:

      • Amazon S3

      • Splunk

    5. Firehose 直播名称下,输入您的直播名称。

    6. (可选)在 “转换记录” 下:

      • 在 “从 Amazon CloudWatch Logs 解压源记录” 部分中,选择开启解压缩

      • 如果要在解压缩后使用消息提取,请选择 “启用消息提取”。

    通过控制台对现有 Firehose 直播启用解压缩

    如果您有一个带有 Lambda 函数的 Firehose 流来执行解压缩,则可以将其替换为 Firehose 解压缩功能。在继续操作之前,请查看您的 Lambda 函数代码以确认它仅执行解压缩或消息提取。您的 Lambda 函数的输出应与上一节图 1 或图 2 中显示的示例类似。如果输出看起来相似,则可以使用以下步骤替换 Lambda 函数。

    1. 用此蓝图替换您当前的 Lambda 函数。新的蓝图 Lambda 函数会自动检测传入的数据是压缩还是解压缩。它只有在输入数据被压缩后才会执行解压缩。

    2. 使用内置的 Firehose 选项开启解压功能,进行解压缩。

    3. 如果您的 Firehose 直播尚未启用,请启用该 CloudWatch 指标。监控指标 CloudWatchProcessorLambda _ IncomingCompressedData 并等到该指标变为零。这确认发送到您的 Lambda 函数的所有输入数据均已解压缩,并且不再需要 Lambda 函数。

    4. 移除 Lambda 数据转换,因为您不再需要它来解压缩您的流。

    从控制台禁用解压缩

    要禁用对数据流的解压缩,请使用 AWS Management Console

    1. 登录 AWS Management Console 并打开 Kinesis 控制台,网址为 https://console.aws.amazon.com/kinesis。

    2. 在导航窗格中选择 Amazon Data Firehose

    3. 选择你要编辑的 Firehose 直播。

    4. Firehose 直播详情页面上,选择配置选项卡。

    5. 在 “转换和转换记录” 部分,选择 “编辑”

    6. 在 “从 Amazon CloudWatch Logs 解压源记录” 下,清除 “开启解压缩”,然后选择 “保存更改”。

    FAQ

    如果在解压缩过程中出错,源数据会怎样?

    如果 Amazon Data Firehose 无法解压缩记录,则记录将按原样(以压缩格式)传送到您在 Firehose 流创建期间指定的错误 S3 存储桶。除了记录外,交付的对象还包括错误代码和错误消息,这些对象将传送到名为的 S3 存储桶前缀decompression-failed。解压缩记录失败后,Firehose 将继续处理其他记录。

    成功解压缩后,如果处理管道出错,源数据会怎样?

    如果 Amazon Data Firehose 在解压后的处理步骤(例如动态分区和数据格式转换)中出错,则记录将以压缩格式传送到您在创建 Firehose 流时指定的错误 S3 存储桶。除了记录外,交付的对象还包括错误代码和错误消息。

    如果出现错误或异常,如何通知您?

    如果在解压缩过程中出现错误或异常,如果您配置 CloudWatch 日志,Firehose 会将错误消息 CloudWatch 记录到日志中。此外,Firehose 还会向您可以 CloudWatch 监控的指标发送指标。您也可以选择根据Firehose发出的指标创建警报。

    put操作不是来自 CloudWatch 日志时会发生什么?

    如果客户不是puts来自 CloudWatch Logs,则会返回以下错误消息:

    Put to Firehose failed for AccountId: <accountID>, FirehoseName: <firehosename> because the request is not originating from allowed source types.

    Firehose 为解压缩功能发布了哪些指标?

    Firehose 会发出每条记录的解压缩指标。您应该选择周期(1 分钟)、统计数据(总和)、日期范围,以获取DecompressedRecords失败或成功或失DecompressedBytes败或成功的次数。有关更多信息,请参阅 CloudWatch 记录解压缩指标

    CloudWatch 大事记

    您可以通过 CloudWatch 向事件规则添加目标,将亚马逊配置为向 Firehose 流发送 CloudWatch 事件。

    为向现有 Firehose 直播发送事件的事件规则创建目标 CloudWatch
    1. 登录 AWS Management Console 并打开 CloudWatch 控制台,网址为https://console.aws.amazon.com/cloudwatch/

    2. 选择创建规则

    3. 在 “步骤 1:创建规则” 页面上,对于 “目标”,选择 “添加目标”,然后选择 Firehose stre am。

    4. 选择现有的 Firehos e 直播。

    有关创建 CloudWatch 事件规则的更多信息,请参阅 Amazon CloudWatch 事件入门

    AWS IoT

    您可以通过添加操作 AWS IoT 将信息配置为向 Firehose 直播发送信息。

    创建向现有 Firehose 直播发送事件的操作
    1. 在 AWS IoT 控制台中创建规则时,在创建规则页面的设置一个或多个操作下,选择添加操作

    2. 选择将消息发送到 Amazon Kinesis Firehose 流

    3. 选择 Configure action

    4. 直播名称中,选择现有的 Firehose 直播。

    5. 对于 Separator,选择要在记录之间插入的分隔符字符。

    6. 对于IAM角色名称,请选择现有IAM角色或选择创建新角色

    7. 选择添加操作

    有关创建 AWS IoT 规则的更多信息,请参阅 AWS IoT 规则教程