查询 AWS CloudTrail 日志 - Amazon Athena

查询 AWS CloudTrail 日志

AWS CloudTrail 是一个记录 AWS 亚马逊云科技账户的 API 调用和事件。

CloudTrail 日志包含有关对 AWS 服务 进行的任何 API 调用的详细信息,包括控制台。CloudTrail 生成加密的日志文件并将其存储在 Amazon S3 中。有关更多信息,请参阅《AWS CloudTrail 用户指南》。

注意

如果您需要跨账户、区域和日期对 CloudTrail 事件信息执行 SQL 查询,请考虑使用 CloudTrail Lake。CloudTrail Lake 是一种可用于创建跟踪的替代 AWS 服务,可将来自企业的信息聚合到单个可搜索的事件数据存储中。它不使用 Amazon S3 存储桶存储,而是将事件存储在某个数据湖中,从而支持更丰富、更快的查询。您可以使用它创建 SQL 查询,以便在自定义时间范围内跨组织和区域搜索事件。由于 CloudTrail Lake 查询是在 CloudTrail 控制台中执行,因此使用 CloudTrail Lake 不需要 Athena。有关更多信息,请参阅 CloudTrail Lake 文档。

将 Athena 与 CloudTrail 日志结合使用是加强对 AWS 服务 活动进行分析的强有力方法。例如,您可以使用查询来确定趋势,并根据属性 (如源 IP 地址或用户) 进一步隔离活动。

常见的应用程序是使用 CloudTrail 日志分析运营活动的安全性和合规性。有关详细示例的信息,请参阅 AWS 大数据博客文章:使用 AWS CloudTrail 和 Amazon Athena 分析安全性、合规性和运营活动

您可以使用 Athena 通过指定日志文件的 LOCATION 直接从 Amazon S3 查询这些日志文件。您可以通过两种方式之一来执行此操作:

  • 通过直接从 CloudTrail 控制台为 CloudTrail 日志文件创建表。

  • 通过在 Athena 控制台中手动为 CloudTrail 日志文件创建表。

了解 CloudTrail 日志和 Athena 表

在开始创建表之前,应了解有关 CloudTrail 以及它如何存储数据的更多信息。这有助于创建所需的表,无论您从 CloudTrail 控制台或从 Athena 创建它们都是如此。

CloudTrail 将日志另存为采用压缩 gzip 格式的 JSON 文本文件 (*.json.gzip)。日志文件的位置取决于您如何设置跟踪、您正在记录的一个 AWS 区域 或多个区域以及其他因素。

有关日志存储位置、JSON 结构和记录文件内容的更多信息,请参阅《AWS CloudTrail 用户指南》中的以下主题:

要收集日志并将其保存到 Amazon S3,请从 AWS Management Console 启用 CloudTrail。有关更多信息,请参阅《AWS CloudTrail 用户指南》中的 创建跟踪记录

请记下您在其中保存日志的目标 Amazon S3 存储桶。将 LOCATION 子句替换为至 CloudTrail 日志位置的路径和要使用的一组对象。该示例使用特定账户的日志的 LOCATION 值,但您可以使用最适合您的应用程序的明确度程度。

例如:

  • 要分析多个账户中的数据,您可以通过使用 LOCATION 's3://MyLogFiles/AWSLogs/',回滚 LOCATION 说明符来指示所有 AWSLogs

  • 要分析特定日期、账户和区域中的数据,请使用 LOCATION 's3://MyLogFiles/123456789012/CloudTrail/us-east-1/2016/03/14/'.

如果使用对象层次结构中的最高级别,则可以在使用 Athena 查询时获得最高程度的灵活度。

使用 CloudTrail 控制台为 CloudTrail 日志创建 Athena 表

您可以创建未分区的 Athena 表,来直接从较旧的 CloudTrail 控制台查询 CloudTrail 日志。若要从 CloudTrail 控制台创建 Athena 表,您需要使用具有在 Athena 中创建表的足够权限的角色登录。

注意

您不能使用 CloudTrail 控制台为组织跟踪记录日志创建 Athena 表。相反,请使用 Athena 控制台手动创建表,以便指定正确的存储位置。有关组织跟踪的信息,请参阅《AWS CloudTrail 用户指南》中的为组织创建跟踪记录

使用 CloudTrail 控制台为 CloudTrail 跟踪记录创建 Athena 表
  1. 访问 https://console.aws.amazon.com/cloudtrail/,打开 CloudTrail 控制台。

  2. 在导航窗格中,选择事件历史记录

  3. 选择 Create Athena table(创建 Athena 表)。

    
                        选择 Create Athena table(创建 Athena 表)
  4. 对于 Storage location(存储位置),使用向下箭头选择其中存储了供跟踪查询的日志文件的 Amazon S3 存储桶。

    注意

    要查找与跟踪记录关联的存储桶的名称,请选择 CloudTrail 导航窗格中的 Trails(跟踪记录),然后查看跟踪记录的 S3 bucket(S3 存储桶)列。要查看 Amazon S3 中存储桶的位置,请在 S3 bucket(S3 存储桶)列中选择存储桶的链接。这样,就可以将 Amazon S3 控制台打开到 CloudTrail 存储桶位置。

  5. 选择创建表。将使用包含 Amazon S3 存储桶名称的默认名称创建表。

使用手动分区在 Athena 中为 CloudTrail 日志创建表

可以在 Athena 控制台中为 CloudTrail 日志文件手动创建表,然后在 Athena 中运行查询。

使用 Athena 控制台为 CloudTrail 跟踪记录创建 Athena 表
  1. 将以下 DDL 语句复制并粘贴到 Athena 控制台查询编辑器中。

    CREATE EXTERNAL TABLE cloudtrail_logs ( eventversion STRING, useridentity STRUCT< type:STRING, principalid:STRING, arn:STRING, accountid:STRING, invokedby:STRING, accesskeyid:STRING, userName:STRING, sessioncontext:STRUCT< attributes:STRUCT< mfaauthenticated:STRING, creationdate:STRING>, sessionissuer:STRUCT< type:STRING, principalId:STRING, arn:STRING, accountId:STRING, userName:STRING>, ec2RoleDelivery:string, webIdFederationData:map<string,string> > >, eventtime STRING, eventsource STRING, eventname STRING, awsregion STRING, sourceipaddress STRING, useragent STRING, errorcode STRING, errormessage STRING, requestparameters STRING, responseelements STRING, additionaleventdata STRING, requestid STRING, eventid STRING, resources ARRAY<STRUCT< arn:STRING, accountid:STRING, type:STRING>>, eventtype STRING, apiversion STRING, readonly STRING, recipientaccountid STRING, serviceeventdetails STRING, sharedeventid STRING, vpcendpointid STRING, eventCategory STRING, tlsDetails struct< tlsVersion:string, cipherSuite:string, clientProvidedHostHeader:string> ) PARTITIONED BY (region string, year string, month string, day string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://CloudTrail_bucket_name/AWSLogs/Account_ID/CloudTrail/';
    注意

    建议使用示例中所示的 org.apache.hive.hcatalog.data.JsonSerDe。虽然存在 com.amazon.emr.hive.serde.CloudTrailSerde,但目前还无法处理一些较新的 CloudTrail 字段。

  2. (可选)删除表格中所有非必填字段。如果您只需要读取一组特定的列,则您的表定义可以排除其他列。

  3. 修改 s3://CloudTrail_bucket_name/AWSLogs/Account_ID/CloudTrail/ 以指向包含您的日志数据的 Amazon S3 存储桶。

  4. 验证字段是否正确列出。有关 CloudTrail 记录中的完整字段列表的更多信息,请参阅 CloudTrail 记录内容

    下面的示例使用了 Hive JSON SerDe。在此示例中,字段 requestparametersresponseelementsadditionaleventdata 作为查询中的 STRING 类型列出,但在 JSON 中使用 STRUCT 数据类型。因此,要将数据移出这些字段,请使用 JSON_EXTRACT 函数。有关更多信息,请参阅 从 JSON 中提取数据。为了提高性能,此示例按 AWS 区域、年份、月份和日期对数据进行分区。

  5. 在 Athena 控制台中运行 CREATE TABLE 语句。

  6. 使用 ALTER TABLE ADD PARTITION 命令加载分区,以便您可以查询它们,如以下示例所示。

    ALTER TABLE table_name ADD PARTITION (region='us-east-1', year='2019', month='02', day='01') LOCATION 's3://cloudtrail_bucket_name/AWSLogs/Account_ID/CloudTrail/us-east-1/2019/02/01/'

使用手动分区为整个组织范围的跟踪创建表

要在 Athena 中为整个组织的 CloudTrail 日志文件创建表,请按照 使用手动分区在 Athena 中为 CloudTrail 日志创建表 中的步骤操作,但需要按照以下过程中的说明进行修改。

为整个组织的 CloudTrail 日志记录创建 Athena 表
  1. CREATE TABLE 语句中,修改 LOCATION 子句以包含组织 ID,如下例所示:

    LOCATION 's3://cloudtrail_bucket_name/AWSLogs/organization_id/Account_ID/CloudTrail/'
  2. PARTITIONED BY 子句中,为账户 ID 添加一个字符串条目,如下例所示:

    PARTITIONED BY (account string, region string, year string, month string, day string)

    以下示例显示的是综合结果:

    ... PARTITIONED BY (account string, region string, year string, month string, day string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://cloudtrail_bucket_name/AWSLogs/organization_id/Account_ID/CloudTrail/'
  3. ALTER TABLE 语句的 ADD PARTITION 子句包含账户 ID,如下例所示:

    ALTER TABLE table_name ADD PARTITION (account='111122223333', region='us-east-1', year='2022', month='08', day='08')
  4. ALTER TABLE 语句的 LOCATION 子句包含组织 ID、账户 ID 以及您要添加的分区,如下例所示:

    LOCATION 's3://cloudtrail_bucket_name/AWSLogs/organization_id/Account_ID/CloudTrail/us-east-1/2022/08/08/'

    以下示例 ALTER TABLE 语句显示的是综合结果:

    ALTER TABLE table_name ADD PARTITION (account='111122223333', region='us-east-1', year='2022', month='08', day='08') LOCATION 's3://cloudtrail_bucket_name/AWSLogs/organization_id/111122223333/CloudTrail/us-east-1/2022/08/08/'

使用分区投影在 Athena 中为 CloudTrail 日志创建表

由于 CloudTrail 日志具有一个已知结构,您可以预先指定该结构的分区方案,因此可以使用 Athena 分区投影功能减少查询运行时间并自动管理分区。当添加新数据时,分区投影会自动添加新分区。这样就不必使用 ALTER TABLE ADD PARTITION 手动添加分区了。

以下示例 CREATE TABLE 语句会自动在 CloudTrail 日志上从指定日期开始到当前为单个 AWS 区域 使用分区投影。在 LOCATIONstorage.location.template 子句中,将存储桶account-idaws-region 占位符替换为对应的相同值。对于 projection.timestamp.range,将 2020/01/01 替换为要使用的开始日期。成功运行查询后,您可以查询表。您无需运行 ALTER TABLE ADD PARTITION 来加载分区。

CREATE EXTERNAL TABLE cloudtrail_logs_pp( eventVersion STRING, userIdentity STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, invokedBy: STRING, accessKeyId: STRING, userName: STRING, sessionContext: STRUCT< attributes: STRUCT< mfaAuthenticated: STRING, creationDate: STRING>, sessionIssuer: STRUCT< type: STRING, principalId: STRING, arn: STRING, accountId: STRING, userName: STRING>, ec2RoleDelivery:string, webIdFederationData:map<string,string> > >, eventTime STRING, eventSource STRING, eventName STRING, awsRegion STRING, sourceIpAddress STRING, userAgent STRING, errorCode STRING, errorMessage STRING, requestparameters STRING, responseelements STRING, additionaleventdata STRING, requestId STRING, eventId STRING, readOnly STRING, resources ARRAY<STRUCT< arn: STRING, accountId: STRING, type: STRING>>, eventType STRING, apiVersion STRING, recipientAccountId STRING, serviceEventDetails STRING, sharedEventID STRING, vpcendpointid STRING, eventCategory STRING, tlsDetails struct< tlsVersion:string, cipherSuite:string, clientProvidedHostHeader:string> ) PARTITIONED BY ( `timestamp` string) ROW FORMAT SERDE 'org.apache.hive.hcatalog.data.JsonSerDe' STORED AS INPUTFORMAT 'com.amazon.emr.cloudtrail.CloudTrailInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat' LOCATION 's3://bucket/AWSLogs/account-id/CloudTrail/aws-region' TBLPROPERTIES ( 'projection.enabled'='true', 'projection.timestamp.format'='yyyy/MM/dd', 'projection.timestamp.interval'='1', 'projection.timestamp.interval.unit'='DAYS', 'projection.timestamp.range'='2020/01/01,NOW', 'projection.timestamp.type'='date', 'storage.location.template'='s3://bucket/AWSLogs/account-id/CloudTrail/aws-region/${timestamp}')

更多有关分区投影的信息,请参阅 使用 Amazon Athena 分区投影

查询嵌套字段

由于 userIdentityresources 字段是嵌套的数据类型,查询这些内容需要特殊处理。

userIdentity 对象由嵌套 STRUCT 类型组成。可以使用点分隔字段以分隔待查询的字段,如下例所示:

SELECT eventsource, eventname, useridentity.sessioncontext.attributes.creationdate, useridentity.sessioncontext.sessionissuer.arn FROM cloudtrail_logs WHERE useridentity.sessioncontext.sessionissuer.arn IS NOT NULL ORDER BY eventsource, eventname LIMIT 10

resources 字段是一个 STRUCT 对象数组。对于这些数组,请使用 CROSS JOIN UNNEST 来取消嵌套数组,以便您可以查询其对象。

下面的示例将返回资源 ARN 以 example/datafile.txt 结尾的所有行。为了便于读取,replace 函数将从 ARN 中删除初始 arn:aws:s3::: 子字符串。

SELECT awsregion, replace(unnested.resources_entry.ARN,'arn:aws:s3:::') as s3_resource, eventname, eventtime, useragent FROM cloudtrail_logs t CROSS JOIN UNNEST(t.resources) unnested (resources_entry) WHERE unnested.resources_entry.ARN LIKE '%example/datafile.txt' ORDER BY eventtime

以下是 DeleteBucket 事件的示例查询。查询将从 resources 对象中提取存储桶的名称以及存储桶所属的账户 ID。

SELECT awsregion, replace(unnested.resources_entry.ARN,'arn:aws:s3:::') as deleted_bucket, eventtime AS time_deleted, useridentity.username, unnested.resources_entry.accountid as bucket_acct_id FROM cloudtrail_logs t CROSS JOIN UNNEST(t.resources) unnested (resources_entry) WHERE eventname = 'DeleteBucket' ORDER BY eventtime

有关取消嵌套的更多信息,请参阅 筛选数组

示例查询

以下示例显示从在 CloudTrail 事件日志创建的表,返回所有匿名(未签名)请求的查询部分。此查询选择 useridentity.accountid 匿名并且 useridentity.arn 未指定的那些请求:

SELECT * FROM cloudtrail_logs WHERE eventsource = 's3.amazonaws.com' AND eventname in ('GetObject') AND useridentity.accountid = 'anonymous' AND useridentity.arn IS NULL AND requestparameters LIKE '%[your bucket name ]%';

有关更多信息,请参阅 AWS 大数据博客文章:使用 AWS CloudTrail 和 Amazon Athena 分析安全性、合规性和运营活动

有关查询 CloudTrail 日志的提示

要浏览 CloudTrail 日志数据,请使用下列提示:

  • 在查询日志之前,请验证您的日志表是否看似与使用手动分区在 Athena 中为 CloudTrail 日志创建表中的表一样。如果不是第一个表,请使用以下命令删除现有表:DROP TABLE cloudtrail_logs

  • 删除现有表后,重新创建它。有关更多信息,请参阅 使用手动分区在 Athena 中为 CloudTrail 日志创建表

    确认正确列出了 Athena 查询中的字段。有关 CloudTrail 记录中的完整字段列表的信息,请参阅 CloudTrail 记录内容

    如果您的查询包含 JSON 格式的字段,例如 STRUCT,请从 JSON 中提取数据。有关更多信息,请参阅 从 JSON 中提取数据

    关于针对 CloudTrail 表发布查询的一些建议如下:

  • 首先查看哪些用户调用了哪些 API 操作以及来自哪些源 IP 地址。

  • 将以下基本 SQL 查询用作您的模板。将查询粘贴到 Athena 控制台并运行它。

    SELECT useridentity.arn, eventname, sourceipaddress, eventtime FROM cloudtrail_logs LIMIT 100;
  • 修改查询以进一步探索您的数据。

  • 为了提高性能,请包含 LIMIT 子句以返回指定的行子集。