Security Lake AWS 源版本 1 (OCSF1.0.0-rc.2) - Amazon Security Lake


Security Lake AWS 源版本 1 (OCSF1.0.0-rc.2)

以下部分提供了有关从 Security Lake 中查询数据的指导,并包括一些原生支持的查询示例 AWS 来源。这些查询旨在检索特定数据中的数据 AWS 区域。 这些示例使用 us-east-1(美国东部(弗吉尼亚北部))。此外,示例查询使用 LIMIT 25 参数,最多返回 25 条记录。您可以省略该参数或根据自己的偏好进行调整。有关更多示例,请参阅 Amazon Security Lak GitHub e OCSF 查询目录


查询 Security Lake 数据时,您必须将数据所在的 Lake Formation 表的名称包含在内。

SELECT * FROM amazon_security_lake_glue_db_DB_Region.amazon_security_lake_table_DB_Region_SECURITY_LAKE_TABLE WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) LIMIT 25


  • cloud_trail_mgmt_1_0 – AWS CloudTrail 管理事件

  • lambda_execution_1_0— Lambda CloudTrail 的数据事件

  • s3_data_1_0— S3 CloudTrail 的数据事件

  • route53_1_0 – Amazon Route 53 Resolver 查询日志

  • sh_findings_1_0 – AWS Security Hub findings

  • vpc_flow_1_0— 亚马逊 Virtual Private Cloud(亚马逊VPC)流日志

示例:表 sh_findings_1_0 中来自 us-east-1 区域的所有 Security Hub 调查发现

SELECT * FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0 WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) LIMIT 25


查询 Security Lake 数据时,您必须将要从中查询数据的数据库区域名称包含在内。有关当前提供 Security Lake 的数据库区域的完整列表,请参阅 Amazon Security Lake 端点

示例:清单 AWS CloudTrail 来自源 IP 的活动

以下示例列出了来自源 IP 的所有 CloudTrail 活动 那是在之后录制的 20230301 (2023 年 3 月 1 日),在表格中 cloud_trail_mgmt_1_0 来自 us-east-1 DB_Region.

SELECT * FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0 WHERE eventDay > '20230301' AND src_endpoint.ip = '' ORDER BY time desc LIMIT 25


通过对数据进行分区,您可以限制每次查询所扫描的数据量,从而提高性能并降低成本。Security Lake 通过 eventDayregionaccountid 参数实施分区。eventDay 分区采用格式 YYYYMMDD

以下是使用 eventDay 分区的查询示例:

SELECT * FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0 WHERE eventDay > '20230301' AND src_endpoint.ip = '' ORDER BY time desc

eventDay 的常见值包括以下内容:

过去 1 年内发生的事件

> cast(date_format(current_timestamp - INTERVAL '1' year, '%Y%m%d%H') as varchar)

过去 1 个月内发生的事件

> cast(date_format(current_timestamp - INTERVAL '1' month, '%Y%m%d%H') as varchar)

过去 30 天内发生的事件

> cast(date_format(current_timestamp - INTERVAL '30' day, '%Y%m%d%H') as varchar)

过去 12 个小时内发生的事件

> cast(date_format(current_timestamp - INTERVAL '12' hour, '%Y%m%d%H') as varchar)

过去 5 分钟内发生的事件

> cast(date_format(current_timestamp - INTERVAL '5' minute, '%Y%m%d%H') as varchar)

7-14 天前发生的事件

BETWEEN cast(date_format(current_timestamp - INTERVAL '14' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar)


>= '20230301'

示例:表中列出了 2023 年 3 月 1 日当天或之后来自源 IP 的所有 CloudTrail 活动 cloud_trail_mgmt_1_0

SELECT * FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0 WHERE eventDay >= '20230301' AND src_endpoint.ip = '' ORDER BY time desc LIMIT 25

示例:表中列出了过去 30 天内来自源 IP 的所有 CloudTrail 活动 cloud_trail_mgmt_1_0

SELECT * FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0 WHERE eventDay > cast(date_format(current_timestamp - INTERVAL '30' day, '%Y%m%d%H') as varchar) AND src_endpoint.ip = '' ORDER BY time desc LIMIT 25

CloudTrail 数据查询示例

AWS CloudTrail 跟踪用户活动和API使用情况 AWS 服务。 订阅者可以查询 CloudTrail 数据以了解以下类型的信息:

以下是一些 CloudTrail 数据查询示例:

未经授权的企图 AWS 服务 在过去 7 天内

SELECT time,, api.operation, api.response.error, api.response.message, unmapped['responseElements'], cloud.region, actor.user.uuid, src_endpoint.ip, http_request.user_agent FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0 WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) AND api.response.error in ( 'Client.UnauthorizedOperation', 'Client.InvalidPermission.NotFound', 'Client.OperationNotPermitted', 'AccessDenied') ORDER BY time desc LIMIT 25

过去 7 天192.0.2.1内来自源 IP 的所有 CloudTrail 活动清单

SELECT api.request.uid, time,, api.operation, cloud.region, actor.user.uuid, src_endpoint.ip, http_request.user_agent FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0 WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) AND src_endpoint.ip = '' ORDER BY time desc LIMIT 25

过去 7 天内的所有IAM活动清单

SELECT * FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0 WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) AND = '' ORDER BY time desc LIMIT 25

过去 7 天内使用过凭证 AIDACKCEVSQ6C2EXAMPLE 的实例

SELECT actor.user.uid, actor.user.uuid, actor.user.account_uid, cloud.region FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0 WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) AND actor.user.credential_uid = 'AIDACKCEVSQ6C2EXAMPLE' LIMIT 25

过去 7 天内失败的 CloudTrail 记录列表

SELECT actor.user.uid, actor.user.uuid, actor.user.account_uid, cloud.region FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_cloud_trail_mgmt_1_0 WHERE status='failed' and eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) ORDER BY time DESC LIMIT 25

Route 53 Resolver 查询日志的查询示例

Amazon Route 53 解析器DNS查询日志会跟踪您的亚马逊VPC内部资源进行的查询。订阅用户可以查询 Route 53 Resolver 查询日志,以了解以下类型的信息:

以下是 Route 53 Resolver 查询日志的一些查询示例:

最近 7 天 CloudTrail 内的DNS查询列表

SELECT time, src_endpoint.instance_uid, src_endpoint.ip, src_endpoint.port, query.hostname, rcode FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_route53_1_0 WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) ORDER BY time DESC LIMIT 25

最近 7 天s3.amazonaws.com内匹配的DNS查询列表

SELECT time, src_endpoint.instance_uid, src_endpoint.ip, src_endpoint.port, query.hostname, rcode, answers FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_route53_1_0 WHERE query.hostname LIKE '' and eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) ORDER BY time DESC LIMIT 25

过去 7 天内未解决的DNS查询列表

SELECT time, src_endpoint.instance_uid, src_endpoint.ip, src_endpoint.port, query.hostname, rcode, answers FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_route53_1_0 WHERE cardinality(answers) = 0 and eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) LIMIT 25

最近 7 天192.0.2.1内@@ 已解决的DNS查询列表

SELECT time, src_endpoint.instance_uid, src_endpoint.ip, src_endpoint.port, query.hostname, rcode, answer.rdata FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_route53_1_0 CROSS JOIN UNNEST(answers) as st(answer) WHERE answer.rdata='' and eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) LIMIT 25

Security Hub 调查发现查询示例

Security Hub 可让您全面了解自己的安全状态 AWS 并帮助您根据安全行业标准和最佳实践检查您的环境。Security Hub 会为安全检查生成调查发现,并接收来自第三方服务的调查发现。

以下是 Security Hub 调查发现的一些查询示例:

过去 7 天内严重性等级大于或等于 MEDIUM 的新调查发现

SELECT time, finding, severity FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0_findings WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) AND severity_id >= 3 AND state_id = 1 ORDER BY time DESC LIMIT 25

过去 7 天内的重复调查发现

SELECT finding.uid, MAX(time) AS time, ARBITRARY(region) AS region, ARBITRARY(accountid) AS accountid, ARBITRARY(finding) AS finding, ARBITRARY(vulnerabilities) AS vulnerabilities FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0 WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) GROUP BY finding.uid LIMIT 25

过去 7 天内的所有非信息性调查发现

SELECT time, finding.title, finding, severity FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0 WHERE severity != 'Informational' and eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) LIMIT 25

资源为 Amazon S3 存储桶的调查发现(无时间限制)

SELECT * FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0 WHERE any_match(resources, element -> element.type = 'amzn-s3-demo-bucket') LIMIT 25

通用漏洞评分系统 (CVSS) 分数大于1(无时间限制)的调查结果

SELECT * FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0 WHERE any_match(vulnerabilities, element -> element.cve.cvss.base_score > 1.0) LIMIT 25

与常见漏洞和风险暴露相匹配的调查结果 (CVE)CVE-0000-0000(无时间限制)

SELECT * FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0 WHERE any_match(vulnerabilities, element -> element.cve.uid = 'CVE-0000-0000') LIMIT 25

过去 7 天内从 Security Hub 发送调查发现的产品数量

SELECT, count(*) FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0 WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) GROUP BY ORDER BY DESC LIMIT 25

过去 7 天内调查发现中的资源类型数量

SELECT count(*), resource.type FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0 CROSS JOIN UNNEST(resources) as st(resource) WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) GROUP BY resource.type LIMIT 25

过去 7 天内调查发现中的易受攻击软件包

SELECT vulnerability FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0, UNNEST(vulnerabilities) as t(vulnerability) WHERE vulnerabilities is not null LIMIT 25

过去 7 天内发生更改的调查发现

SELECT finding.uid, finding.created_time, finding.first_seen_time, finding.last_seen_time, finding.modified_time, finding.title, state FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_sh_findings_1_0 WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) LIMIT 25

Amazon VPC 流日志的查询示例

Amazon Virtual Private Cloud(亚马逊VPC)提供了有关进出您的网络接口的 IP 流量的详细信息VPC。

以下是 Amazon VPC Flow Logs 的一些查询示例:

具体的流量 AWS 区域 在过去 7 天内

SELECT * FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0 WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) AND region in ('us-east-1','us-east-2','us-west-2') LIMIT 25

过去 7 天内来自源 IP 和源端口 22 的活动的列表

SELECT * FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0 WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) AND src_endpoint.ip = '' AND src_endpoint.port = 22 LIMIT 25

过去 7 天内不同目标 IP 地址的数量

SELECT COUNT(DISTINCT dst_endpoint.ip) FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0 WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) LIMIT 25

过去 7 天内源自 的流量

SELECT * FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0 WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) AND split_part(src_endpoint.ip,'.', 1)='198'AND split_part(src_endpoint.ip,'.', 2)='51' LIMIT 25

最近 7 天的所有HTTPS流量

SELECT dst_endpoint.ip as dst, src_endpoint.ip as src, traffic.packets FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0 WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) AND dst_endpoint.port = 443 GROUP BY dst_endpoint.ip, traffic.packets, src_endpoint.ip ORDER BY traffic.packets DESC LIMIT 25

按过去 7 天内发送到端口 443 的连接的数据包数量排序

SELECT traffic.packets, dst_endpoint.ip FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0 WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) AND dst_endpoint.port = 443 GROUP BY traffic.packets, dst_endpoint.ip ORDER BY traffic.packets DESC LIMIT 25

过去 7 天内 IP 之间的所有流量

SELECT start_time, end_time, src_endpoint.interface_uid, connection_info.direction, src_endpoint.ip, dst_endpoint.ip, src_endpoint.port, dst_endpoint.port, traffic.packets, traffic.bytes FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0 WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) AND( src_endpoint.ip = '' AND dst_endpoint.ip = '') OR ( src_endpoint.ip = '' AND dst_endpoint.ip = '') ORDER BY start_time ASC LIMIT 25

过去 7 天内的所有入站流量

SELECT * FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0 WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) AND connection_info.direction = 'ingress' LIMIT 25

过去 7 天的所有出站流量

SELECT * FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0 WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) AND connection_info.direction = 'egress' LIMIT 25

过去 7 天内所有被拒绝的流量

SELECT * FROM amazon_security_lake_glue_db_us_east_1.amazon_security_lake_table_us_east_1_vpc_flow_1_0 WHERE eventDay BETWEEN cast(date_format(current_timestamp - INTERVAL '7' day, '%Y%m%d%H') as varchar) and cast(date_format(current_timestamp - INTERVAL '0' day, '%Y%m%d%H') as varchar) AND type_uid = 400105 LIMIT 25