Kinesis Agent - Amazon Data Firehose

将 Amazon Data Firehose 流传输到亚马逊 S3 中的 Apache Iceberg Tables 处于预览阶段,可能会发生变化。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

Kinesis Agent

Amazon Kinesis 代理是一款独立的 Java 软件应用程序,可作为参考实现,展示如何收集数据并将其发送到 Firehose。代理会持续监控一组文件,并将新数据发送到您的 Firehose 直播中。该代理显示了如何处理文件轮换、检查点操作和失败时重试。它展示了如何以可靠、及时和简单的方式交付数据。它还显示了如何发布 CloudWatch指标以更好地监控流媒体过程并对其进行故障排除。要了解更多信息,请访问 awslabs/ amazon-kinesis-agent

默认情况下,会基于换行符 ('\n') 分析每个文件中的记录。但是,也可以将代理配置为分析多行记录(请参阅 代理配置设置)。

您可以在基于 Linux 的服务器环境(如 Web 服务器、日志服务器和数据库服务器)上安装此代理。安装代理后,通过指定要监控的文件和数据的 Firehose 流来对其进行配置。配置代理后,它会持久地从文件中收集数据,并可靠地将其发送到 Firehose 数据流。

主题

    使用以下方法之一管理您的 AWS 证书:

    • 创建自定义凭证提供程序。有关详细信息,请参阅创建自定义凭证提供商

    • 在启动EC2实例时指定一个IAM角色。

    • 在配置代理时指定 AWS 凭据(参见下方配置表awsSecretAccessKey中的awsAccessKeyId和条目代理配置设置)。

    • 编辑/etc/sysconfig/aws-kinesis-agent以指定您的 AWS 地区和 AWS 访问密钥。

    • 如果您的EC2实例位于其他 AWS 账户中,请创建一个IAM角色以提供对 Amazon Data Firehose 服务的访问权限。在配置代理时指定该角色(参见assumeRoleARNassumeRoleExternalID)。使用前面的方法之一来指定另一个账户中有权担任此角色的用户的 AWS 证书。

    您可以创建自定义凭证提供程序,并在以下配置设置中为 Kinesis 代理提供其类名和 jar 路径:userDefinedCredentialsProvider.classnameuserDefinedCredentialsProvider.location。有关这两个配置设置的说明,请参阅代理配置设置

    要创建自定义凭证提供程序,请定义一个实现 AWS CredentialsProvider 接口的类,如下例所示。

    import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.auth.BasicAWSCredentials; public class YourClassName implements AWSCredentialsProvider { public YourClassName() { } public AWSCredentials getCredentials() { return new BasicAWSCredentials("key1", "key2"); } public void refresh() { } }

    您的类必须有一个不带参数的构造函数。

    AWS 定期调用刷新方法以获取更新的凭证。如果希望凭证提供程序在其整个生命周期内提供不同的凭证,请在此方法中包含用于刷新凭证的代码。或者,如果您需要提供静态(不更改)凭证的凭证提供程序,则可以将此方法保留为空。

    首先,连接到您的实例。有关更多信息,请参阅 Amazon EC2 用户指南中的 Connect 到您的实例。如果您在连接时遇到问题,请参阅 Amazon EC2 用户指南中的实例连接疑难解答

    接下来,请使用以下方法之一安装代理。

    • 要从 Amazon Linux 存储库设置代理

      此方法仅适用于 Amazon Linux 实例。使用以下命令:

      sudo yum install –y aws-kinesis-agent

      Agent v 2.0.0 或更高版本安装在装有 Amazon Linux 2 () AL2 操作系统的计算机上。此代理版本需要安装 Java 1.8 或更高版本。如果尚未安装所需的 Java 版本,代理安装程序将会安装。有关亚马逊 Linux 2 的更多信息,请参阅https://aws.amazon.com/amazon-linux-2/

    • 从 Amazon S3 存储库设置代理

      此方法从公开可用的存储库安装代理,因此适用于 Red Hat Enterprise Linux 以及 Amazon Linux 2 实例。使用以下命令下载并安装最新版本的代理 2.x.x:

      sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm

      要安装特定版本的代理,请在命令中指定版本号。例如,以下命令将安装代理 2.0.1。

      sudo yum install –y https://streaming-data-agent.s3.amazonaws.com/aws-kinesis-agent-2.0.1-1.amzn1.noarch.rpm

      如果您使用的是 Java 1.7,但不想升级,则可以下载与 Java 1.7 兼容的代理 1.x.x。例如,要下载代理 1.1.6,可使用以下命令:

      sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-1.1.6-1.amzn1.noarch.rpm

      可以使用以下命令下载最新的代理 1.x.x:

      sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn1.noarch.rpm
    • 从 GitHub repo 中设置代理

      1. 首先,确保您已安装所需的 Java 版本,具体取决于代理版本。

      2. awslabs amazon-kinesis-agent GitHub /存储库下载代理。

      3. 导航到下载目录并运行以下命令来安装代理:

        sudo ./setup --install
    • 在 Docker 容器中设置代理

      Kinesis 代理可以在容器中运行,也可以通过 amazonlinux 容器库运行。使用以下 Dockerfile,然后运行 docker build

      FROM amazonlinux RUN yum install -y aws-kinesis-agent which findutils COPY agent.json /etc/aws-kinesis/agent.json CMD ["start-aws-kinesis-agent"]
    配置并启动代理
    1. 打开并编辑配置文件(如果使用默认文件访问权限,则以超级用户的身份来执行):/etc/aws-kinesis/agent.json

      在此配置文件中,指定代理从中收集数据的文件 ("filePattern") 以及代理向其发送数据的 Firehose 流的名称 ("deliveryStream")。文件名是一种模式,并且代理能够识别文件轮换。每秒内您轮换使用文件或创建新文件的次数不能超过一次。代理使用文件创建时间戳来确定要跟踪哪些文件并将其追踪到您的 Firehose 直播中。如果每秒创建新文件或轮换使用文件的次数超过一次,代理将无法正确区分这些文件。

      { "flows": [ { "filePattern": "/tmp/app.log*", "deliveryStream": "yourdeliverystream" } ] }

      默认 AWS 区域为us-east-1。如果使用的是其他区域,请将 firehose.endpoint 设置添加到配置文件,为区域指定终端节点。有关更多信息,请参阅 代理配置设置

    2. 手动启动代理:

      sudo service aws-kinesis-agent start
    3. (可选)将代理配置为在系统启动时启动:

      sudo chkconfig aws-kinesis-agent on

    现在,代理作为系统服务在后台运行。它会持续监视指定的文件并将数据发送到指定的 Firehose 流。代理活动记录在 /var/log/aws-kinesis-agent/aws-kinesis-agent.log 中。

    代理支持两个必需的配置设置,即 filePatterndeliveryStream,以及可用于其他功能的可选配置设置。您可以在 /etc/aws-kinesis/agent.json 中指定必需配置设置和可选配置设置。

    只要您更改配置文件,就必须使用以下命令停止再启动代理:

    sudo service aws-kinesis-agent stop sudo service aws-kinesis-agent start

    或者,您也可以使用以下命令:

    sudo service aws-kinesis-agent restart

    一般的设置配置如下。

    配置设置 描述
    assumeRoleARN

    要由用户担任的角色的 Amazon 资源名称 (ARN)。有关更多信息,请参阅《IAM用户指南》中的使用IAM角色委派跨 AWS 账户访问权限

    assumeRoleExternalId

    确定谁可以担任该角色的可选标识符。有关更多信息,请参阅《IAM用户指南》中的 “如何使用外部 ID”。

    awsAccessKeyId

    AWS 覆盖默认凭证的访问密钥 ID。此设置优先于所有其他凭证提供程序。

    awsSecretAccessKey

    AWS 覆盖默认凭证的密钥。此设置优先于所有其他凭证提供程序。

    cloudwatch.emitMetrics

    CloudWatch 如果已设置,则允许代理向其发送指标 (true)。

    默认:True

    cloudwatch.endpoint

    的区域终端节点 CloudWatch。

    默认:monitoring.us-east-1.amazonaws.com

    firehose.endpoint

    亚马逊 Data Firehose 的区域终端节点。

    默认:firehose.us-east-1.amazonaws.com

    sts.endpoint

    AWS 安全令牌服务的区域终端节点。

    默认:https://sts.amazonaws.com

    userDefinedCredentialsProvider.classname 如果定义自定义凭证提供程序,请使用此设置提供其完全限定类名。不要在类名末尾包含 .class
    userDefinedCredentialsProvider.location 如果定义自定义凭证提供程序,请使用此设置指定包含自定义凭证提供程序的 jar 的绝对路径。代理还在以下位置查找 jar 文件:/usr/share/aws-kinesis-agent/lib/

    流配置设置如下。

    配置设置 描述
    aggregatedRecordSizeBytes

    要使代理聚合记录,然后通过一次操作将其放入 Firehose 流,请指定此设置。将其设置为你希望聚合记录在代理将其放入 Firehose 直播之前的大小。

    默认值:0(不聚合)

    dataProcessingOptions

    在每条已解析的记录发送到 Firehose 流之前,应用于该记录的处理选项列表。这些处理选项按指定的顺序执行。有关更多信息,请参阅 使用代理预处理数据

    deliveryStream

    [必填] Firehose 直播的名称。

    filePattern

    [必需] 需要由代理监控的文件的 Glob。与此模式匹配的任何文件都会自动由代理挑选出来并进行监控。对于匹配此模式的所有文件,请向 aws-kinesis-agent-user 授予读取权限。对于包含这些文件的目录,请向 aws-kinesis-agent-user 授予读取和执行权限。

    重要

    代理将选择与此模式匹配的任何文件。为了确保代理不会选择意外的记录,请仔细选择此模式。

    initialPosition

    开始解析文件的初始位置。有效值为 START_OF_FILEEND_OF_FILE

    默认:END_OF_FILE

    maxBufferAgeMillis

    代理在将数据发送到 Firehose 流之前缓冲数据的最长时间,以毫秒为单位。

    值范围:1,000-900,000(1 秒到 15 分钟)

    默认值:60,000(1 分钟)

    maxBufferSizeBytes

    代理在将数据发送到 Firehose 流之前对其进行缓冲的最大大小,以字节为单位。

    值范围:1-4,194,304(4MB)

    默认值:4194304 (4 MB)

    maxBufferSizeRecords

    代理在将数据发送到 Firehose 流之前对其进行缓冲的最大记录数。

    值范围:1-500

    默认值:500

    minTimeBetweenFilePollsMillis

    代理轮询和分析受监控文件中是否有新数据的时间间隔(以毫秒计)。

    值范围:1 或更大值

    默认值:100

    multiLineStartPattern

    用于标识记录开始的模式。记录由与模式匹配的行以及与模式不匹配的任何以下行组成。有效值为正则表达式。默认情况下,日志文件中的每一个新行都被解析为一条记录。

    skipHeaderLines

    代理从受监控文件开头跳过分析的行数。

    值范围:0 或更大值

    默认值:0(零)

    truncatedRecordTerminator

    当记录大小超过 Amazon Data Firehose 记录大小限制时,代理用来截断已解析记录的字符串。(1000 KB)

    默认值:'\n'(换行符)

    通过指定多个流程配置设置,您可以配置代理以监控多个文件目录并将数据发送到多个流。在以下配置示例中,代理监控两个文件目录,并将数据分别发送到 Kinesis 数据流和 Firehose 流。您可以为 Kinesis Data Streams 和 Amazon Data Firehose 指定不同的终端节点,这样您的数据流和 Firehose 流就不必位于同一个区域。

    { "cloudwatch.emitMetrics": true, "kinesis.endpoint": "https://your/kinesis/endpoint", "firehose.endpoint": "https://your/firehose/endpoint", "flows": [ { "filePattern": "/tmp/app1.log*", "kinesisStream": "yourkinesisstream" }, { "filePattern": "/tmp/app2.log*", "deliveryStream": "yourfirehosedeliverystream" } ] }

    有关在 Amazon Kinesis Data Streams 中使用代理的更多详细信息,请参阅使用 Kinesis 代理写入 Amazon Kinesis Data Streams

    代理可以预处理从受监控文件中解析的记录,然后再将其发送到您的 Firehose 直播中。通过将 dataProcessingOptions 配置设置添加到您的文件流可以启用此功能。可以添加一个或多个处理选项,这些选项将按指定的顺序执行。

    代理支持以下处理选项。由于代理是开源的,您可以进一步开发和扩展其处理选项。您可以从 Kinesis 代理下载代理。

    处理选项
    SINGLELINE

    通过移除换行符和首尾的空格,将多行记录转换为单行记录。

    { "optionName": "SINGLELINE" }
    CSVTOJSON

    将记录从分隔符分隔的格式转换为格式。JSON

    { "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", ... ], "delimiter": "yourdelimiter" }
    customFieldNames

    [必填] 在每个JSON键值对中用作键的字段名称。例如,如果您指定 ["f1", "f2"],则记录“v1, v2”将转换为 {"f1":"v1","f2":"v2"}

    delimiter

    在记录中用作分隔符的字符串。默认值为逗号(,)。

    LOGTOJSON

    将记录从日志格式转换为JSON格式。支持的日志格式为 Apache Common LogApache Combined LogApache Error LogRFC3164 Syslog

    { "optionName": "LOGTOJSON", "logFormat": "logformat", "matchPattern": "yourregexpattern", "customFieldNames": [ "field1", "field2", ] }
    logFormat

    [必需] 日志条目格式。以下是可能的值:

    • COMMONAPACHELOG – Apache 通用日志格式。默认情况下每个日志条目都为以下模式:“%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}”。

    • COMBINEDAPACHELOG – Apache 组合日志格式。默认情况下每个日志条目都为以下模式:“%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}”。

    • APACHEERRORLOG – Apache 错误日志格式。默认情况下每个日志条目都为以下模式:“[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}”。

    • SYSLOG— RFC3164 系统日志格式。默认情况下每个日志条目都为以下模式:“%{timestamp} %{hostname} %{program}[%{processid}]: %{message}”。

    matchPattern

    覆盖指定的日志格式的默认模式。如果日志条目使用自定义格式,则可以使用该设置提取日志条目中的值。如果指定 matchPattern,还必须指定 customFieldNames

    customFieldNames

    在每个JSON键值对中用作键的自定义字段名称。您可以使用此设置定义从 matchPattern 中提取的值的字段名称,或覆盖预定义日志格式的默认字段名称。

    例 : LOGTOJSON 配置

    以下是将 Apache Common Log 条目转换为JSON格式的LOGTOJSON配置示例:

    { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" }

    转换前:

    64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291

    转换后:

    {"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
    例 : 使用自定义字段进行LOGTOJSON配置

    下面是 LOGTOJSON 配置的另一个示例:

    { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"] }

    使用此配置设置,将上一个示例中的相同 Apache Common Log 条目转换为如下JSON格式:

    {"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
    例 :转换 Apache 通用日志条目

    以下流程配置将 Apache Common Log 条目转换为格式的单行记录:JSON

    { "flows": [ { "filePattern": "/tmp/app.log*", "deliveryStream": "my-delivery-stream", "dataProcessingOptions": [ { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" } ] } ] }
    例 :转换多行记录

    以下流配置分析第一行以“[SEQUENCE=”开头的多行记录。每个记录先转换为一个单行记录。然后,将基于制表分隔符从记录中提取值。提取的值将映射到指定的customFieldNames值,以形成JSON格式的单行记录。

    { "flows": [ { "filePattern": "/tmp/app.log*", "deliveryStream": "my-delivery-stream", "multiLineStartPattern": "\\[SEQUENCE=", "dataProcessingOptions": [ { "optionName": "SINGLELINE" }, { "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", "field3" ], "delimiter": "\\t" } ] } ] }
    例 : 使用匹配模式进行LOGTOJSON配置

    以下是将 Apache Common Log 条目转换为JSON格式的LOGTOJSON配置示例,其中省略了最后一个字段(字节):

    { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})", "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"] }

    转换前:

    123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200

    转换后:

    {"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}

    系统启动时自动启动代理。

    sudo chkconfig aws-kinesis-agent on

    检查代理的状态:

    sudo service aws-kinesis-agent status

    停止代理:

    sudo service aws-kinesis-agent stop

    从此位置读取代理的日志文件:

    /var/log/aws-kinesis-agent/aws-kinesis-agent.log

    卸载代理:

    sudo yum remove aws-kinesis-agent

    有没有适用于 Windows 的 Kinesis 代理?

    适用于 Windows 的 Kinesis 代理与适用于 Linux 平台的 Kinesis 代理是不同的软件。

    为什么 Kinesis 代理速度变慢且/或 RecordSendErrors 增加?

    这通常是由于 Kinesis 节流造成的。查看 Kinesis Data Streams 的WriteProvisionedThroughputExceeded指标或 Firehose 直播ThrottledRecords的指标。这些指标从 0 开始的任何增加都表示需要增加流限制。有关更多信息,请参阅 Kinesis 数据流限制和 Firehose 数据流

    排除节流后,查看 Kinesis 代理是否配置为跟踪大量小文件。Kinesis 代理跟踪新文件时会有延迟,因此 Kinesis 代理应跟踪少量大文件。尝试将您的日志文件合并为大文件。

    为什么我会收到 java.lang.OutOfMemoryError 异常?

    Kinesis 代理没有足够的内存来处理当前的工作负载。尝试增加 /usr/bin/start-aws-kinesis-agent 中的 JAVA_START_HEAPJAVA_MAX_HEAP,并重新启动代理。

    为什么我会收到 IllegalStateException : connection pool shut down 异常?

    Kinesis 代理没有足够的连接来处理当前的工作负载。尝试在 /etc/aws-kinesis/agent.json 的常规代理配置设置中增加 maxConnectionsmaxSendingThreads。这些字段的默认值是可用运行时系统处理器的 12 倍。有关高级代理配置设置的更多信息,请参见 AgentConfiguration.java

    如何调试 Kinesis 代理的其他问题?

    可在 /etc/aws-kinesis/log4j.xml 中启用 DEBUG 级别日志。

    我应该如何配置 Kinesis 代理?

    maxBufferSizeBytes 越小,Kinesis 代理发送数据的频率就越高。这可能是好事,因为这减少了记录的传输时间,但也增加了每秒对 Kinesis 的请求。

    为什么 Kinesis 代理会发送重复记录?

    这是由于文件跟踪中的错误配置造成的。确保每个 fileFlow’s filePattern 只匹配一个文件。如果正在使用的 logrotate 模式处于 copytruncate 模式,也可能发生这种情况。尝试将模式更改为默认模式或创建模式以避免重复。有关处理重复记录的更多信息,请参阅处理重复记录