使用 CloudTrail 处理库 - AWS CloudTrail

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 CloudTrail 处理库

CloudTrail 处理库是一个 Java 库,它提供了一种处理 AWS CloudTrail 日志的简便方法。您可以提供有关 CloudTrail SQS队列的配置详细信息并编写代码来处理事件。剩下的就交给 CloudTrail 处理库了。它会轮询您的 Amazon 队SQS列,读取和解析队列消息,下载 CloudTrail 日志文件,解析日志文件中的事件,并将事件作为 Java 对象传递到您的代码。

CloudTrail 处理库具有高度的可扩展性和容错能力。它可以并行处理日志文件,以便您可以根据需要处理任意数量的日志。它会处理与网络超时和无法访问的资源相关的网络故障。

以下主题向您展示如何使用 CloudTrail 处理库来处理 Java 项目中的 CloudTrail 日志。

该库是作为Apache许可的开源项目提供的,可在以下网址获得:。 GitHub https://github.com/aws/aws-cloudtrail-processing-library库源包括可用作您自己的项目基础的示例代码。

最低要求

要使用 CloudTrail 处理库,必须具备以下条件:

处理 CloudTrail 日志

要在 Java 应用程序中处理 CloudTrail 日志,请执行以下操作:

将 CloudTrail 处理库添加到您的项目中

要使用 CloudTrail 处理库,请将其添加到 Java 项目的类路径中。

将库添加到 Apache Ant 项目

将 CloudTrail 处理库添加到 Apache Ant 项目中
  1. 从以下地址下载或克隆 CloudTrail 处理库源代码 GitHub:

  2. 从源代码生成.jar 文件,如以下所README述:

    mvn clean install -Dgpg.skip=true
  3. 将生成的 .jar 文件复制到您的项目中,并将它添加到您项目的 build.xml 文件中。例如:

    <classpath> <pathelement path="${classpath}"/> <pathelement location="lib/aws-cloudtrail-processing-library-1.6.1.jar"/> </classpath>

将库添加到 Apache Maven 项目

CloudTrail 处理库可用于 Apache Maven。您可以将其添加到您的项目,方法是:在项目的 pom.xml 文件中编写单个依赖项。

将 CloudTrail 处理库添加到 Maven 项目
  • 打开您的 Maven 项目的 pom.xml 文件,并添加如下依赖项:

    <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-cloudtrail-processing-library</artifactId> <version>1.6.1</version> </dependency>

将库添加到 Eclipse 项目

将 CloudTrail 处理库添加到 Eclipse 项目
  1. 从以下地址下载或克隆 CloudTrail 处理库源代码 GitHub:

  2. 从源代码生成.jar 文件,如以下所README述:

    mvn clean install -Dgpg.skip=true
  3. 将生成的 aws-cloudtrail-processing-library -1.6.1.jar 复制到项目中的某个目录中(通常)。lib

  4. 在 Eclipse Project Explorer 中右键单击您的项目名称,选择 Build Path,然后选择 Configure

  5. Java Build Path 窗口中,选择 Libraries 选项卡。

  6. 选择添加JARs... 然后导航到你复制的路径 aws-cloudtrail-processing-library -1.6.1.jar。

  7. 选择 OK 以完成将 .jar 添加到您的项目的过程。

将库添加到 IntelliJ 项目

将 CloudTrail 处理库添加到 IntelliJ 项目
  1. 从以下地址下载或克隆 CloudTrail 处理库源代码 GitHub:

  2. 从源代码生成.jar 文件,如以下所README述:

    mvn clean install -Dgpg.skip=true
  3. File 中,选择 Project Structure

  4. 选择 Modules,然后选择 Dependencies

  5. 选择 + JARS 或 “目录”,然后转到您构建的路径aws-cloudtrail-processing-library-1.6.1.jar

  6. 选择 Apply,然后选择 OK 以完成将 .jar 添加到您的项目的过程。

配置 CloudTrail 处理库

您可以通过创建在运行时加载的类路径属性文件来配置 CloudTrail 处理库,也可以手动创建ClientConfiguration对象并设置选项。

提供属性文件

您可以编写向您的应用程序提供配置选项的类路径属性文件。以下示例文件显示了您可设置的选项:

# AWS access key. (Required) accessKey = your_access_key # AWS secret key. (Required) secretKey = your_secret_key # The SQS URL used to pull CloudTrail notification from. (Required) sqsUrl = your_sqs_queue_url # The SQS end point specific to a region. sqsRegion = us-east-1 # A period of time during which Amazon SQS prevents other consuming components # from receiving and processing that message. visibilityTimeout = 60 # The S3 region to use. s3Region = us-east-1 # Number of threads used to download S3 files in parallel. Callbacks can be # invoked from any thread. threadCount = 1 # The time allowed, in seconds, for threads to shut down after # AWSCloudTrailEventProcessingExecutor.stop() is called. If they are still # running beyond this time, they will be forcibly terminated. threadTerminationDelaySeconds = 60 # The maximum number of AWSCloudTrailClientEvents sent to a single invocation # of processEvents(). maxEventsPerEmit = 10 # Whether to include raw event information in CloudTrailDeliveryInfo. enableRawEventInfo = false # Whether to delete SQS message when the CloudTrail Processing Library is unable to process the notification. deleteMessageUponFailure = false

以下参数为必需参数:

  • sqsUrl— 提供URL从中提取 CloudTrail通知的方法。如果不指定此值,则 AWSCloudTrailProcessingExecutor 会引发 IllegalStateException

  • accessKey— 您的账户的唯一标识符,例如AKIAIOSFODNN7EXAMPLE。

  • secretKey— 您的账户的唯一标识符,例如 wJalr XUtnFEMI /K7MDENG/。bPxRfi CYEXAMPLEKEY

accessKeysecretKey参数提供您访问库的 AWS 凭据,以便库可以 AWS 代表您进行访问。

其他参数的默认值由库设置。有关更多信息,请参阅 AWS CloudTrail Processing Library 参考

创建一个 ClientConfiguration

您可以通过在 ClientConfiguration 对象上设置选项来提供针对 AWSCloudTrailProcessingExecutor 的选项,而不是在类路径属性中设置选项,如下面的示例所示:

ClientConfiguration basicConfig = new ClientConfiguration( "http://sqs.us-east-1.amazonaws.com/123456789012/queue2", new DefaultAWSCredentialsProviderChain()); basicConfig.setEnableRawEventInfo(true); basicConfig.setThreadCount(4); basicConfig.setnEventsPerEmit(20);

实施事件处理器

要处理 CloudTrail 日志,必须实现接收 CloudTrail 日志数据的。EventsProcessor以下是一个实施示例:

public class SampleEventsProcessor implements EventsProcessor { public void process(List<CloudTrailEvent> events) { int i = 0; for (CloudTrailEvent event : events) { System.out.println(String.format("Process event %d : %s", i++, event.getEventData())); } } }

在实现时EventsProcessor,您需要实现AWSCloudTrailProcessingExecutor用于向您发送 CloudTrail 事件的process()回调。事件在 CloudTrailClientEvent 对象的列表中提供。

CloudTrailClientEvent对象提供了一个CloudTrailEventCloudTrailEventMetadata,你可以用它来读取 CloudTrail 事件和交付信息。

该简单示例会输出传递到 SampleEventsProcessor 的每个事件的事件信息。在您自己的实现中,您可以按照所需方式处理日志。只要 AWSCloudTrailProcessingExecutor 有事件要发送并仍在运行,它就会继续将事件发送至您的 EventsProcessor

实例化和运行处理执行程序

在您编写EventsProcessor并设置 CloudTrail 处理库的配置值(在属性文件中或通过使用ClientConfiguration类)之后,您可以使用这些元素来初始化和使用AWSCloudTrailProcessingExecutor

用于AWSCloudTrailProcessingExecutor处理 CloudTrail 事件
  1. 实例化 AWSCloudTrailProcessingExecutor.Builder 对象。 Builder 的构造函数采用一个 EventsProcessor 对象和一个类路径属性文件名。

  2. 调用 Builderbuild() 工厂方法,以配置并获取 AWSCloudTrailProcessingExecutor 对象。

  3. 使用start()AWSCloudTrailProcessingExecutorstop()方法开始和结束 CloudTrail 事件处理。

public class SampleApp { public static void main(String[] args) throws InterruptedException { AWSCloudTrailProcessingExecutor executor = new AWSCloudTrailProcessingExecutor.Builder(new SampleEventsProcessor(), "/myproject/cloudtrailprocessing.properties").build(); executor.start(); Thread.sleep(24 * 60 * 60 * 1000); // let it run for a while (optional) executor.stop(); // optional } }

高级主题

筛选要处理的事件

默认情况下,Amazon SQS 队列的 S3 存储桶中的所有日志及其包含的所有事件都将发送到您的EventsProcessor。P CloudTrail rocessing Library 提供了可选接口,您可以实现这些接口来筛选用于获取 CloudTrail 日志的源并筛选您有兴趣处理的事件。

SourceFilter

您可以实施 SourceFilter 接口,以选择是否要处理来自提供的源的日志。SourceFilter 声明单个调用方法 filterSource(),该方法接收一个 CloudTrailSource 对象。要阻止源中的事件被处理,请从 false () 返回 filterSource()

CloudTrail 处理库会在库轮询 Amazon SQS 队列中的日志后调用该filterSource()方法。在库启动日志的事件筛选或处理之前会发生此事。

以下是一个实施示例:

public class SampleSourceFilter implements SourceFilter{ private static final int MAX_RECEIVED_COUNT = 3; private static List<String> accountIDs ; static { accountIDs = new ArrayList<>(); accountIDs.add("123456789012"); accountIDs.add("234567890123"); } @Override public boolean filterSource(CloudTrailSource source) throws CallbackException { source = (SQSBasedSource) source; Map<String, String> sourceAttributes = source.getSourceAttributes(); String accountId = sourceAttributes.get( SourceAttributeKeys.ACCOUNT_ID.getAttributeKey()); String receivedCount = sourceAttributes.get( SourceAttributeKeys.APPROXIMATE_RECEIVE_COUNT.getAttributeKey()); int approximateReceivedCount = Integer.parseInt(receivedCount); return approximateReceivedCount <= MAX_RECEIVED_COUNT && accountIDs.contains(accountId); } }

如果您未提供自己的 SourceFilter,则会使用 DefaultSourceFilter,这样将允许处理所有源 (它总是返回 true)。

EventFilter

您可以实现EventFilter接口来选择是否向您的发送 CloudTrail事件EventsProcessorEventFilter声明一个接收CloudTrailEvent对象的filterEvent()单个回调方法。要阻止事件被处理,请从 false () 返回 filterEvent()

CloudTrail 处理库会在库轮询 Amazon SQS 队列中的日志以及筛选源代码之后调用该filterEvent()方法。这在库开始处理日志的事件之前发生。

请参阅下面的实施示例:

public class SampleEventFilter implements EventFilter{ private static final String EC2_EVENTS = "ec2.amazonaws.com"; @Override public boolean filterEvent(CloudTrailClientEvent clientEvent) throws CallbackException { CloudTrailEvent event = clientEvent.getEvent(); String eventSource = event.getEventSource(); String eventName = event.getEventName(); return eventSource.equals(EC2_EVENTS) && eventName.startsWith("Delete"); } }

如果您未提供自己的 EventFilter,则会使用 DefaultEventFilter,这样将允许处理所有事件 (它总是返回 true)。

处理数据事件

CloudTrail 处理数据事件时,它会保留原始格式的数字,无论是整数 (int) 还是float(包含十进制的数字)。在数据事件字段中包含整数的事件中, CloudTrail 历史上会将这些数字作为浮点数进行处理。目前,通过保留这些字段的原始格式来 CloudTrail 处理这些字段中的数字。

作为最佳实践,为避免破坏自动化,请灵活使用任何用于处理或筛选 CloudTrail 数据事件的代码或自动化,并允许两者兼int而有float格式的数字。为获得最佳效果,请使用 CloudTrail 处理库 1.4.0 或更高版本。

以下示例代码段显示了 float 格式的数字 2.0,用于数据事件的 ResponseParameters数据块中的 desiredCount 参数。

"eventName": "CreateService", "awsRegion": "us-east-1", "sourceIPAddress": "000.00.00.00", "userAgent": "console.amazonaws.com", "requestParameters": { "clientToken": "EXAMPLE", "cluster": "default", "desiredCount": 2.0 ...

以下示例代码段显示了 int 格式的数字 2,用于数据事件的 ResponseParameters数据块中的 desiredCount 参数。

"eventName": "CreateService", "awsRegion": "us-east-1", "sourceIPAddress": "000.00.00.00", "userAgent": "console.amazonaws.com", "requestParameters": { "clientToken": "EXAMPLE", "cluster": "default", "desiredCount": 2 ...

报告进度

实现ProgressReporter接口以自定义 CloudTrail 处理库进度报告。 ProgressReporter声明了两个方法:reportStart()reportEnd(),它们在以下操作的开头和结尾处被调用:

  • 轮询来自亚马逊的消息 SQS

  • 正在解析来自亚马逊的消息 SQS

  • 正在处理 Amazon SQS 来源的 CloudTrail 日志

  • 从亚马逊删除消息 SQS

  • 下载 CloudTrail 日志文件

  • 处理 CloudTrail 日志文件

这两种方法均会收到一个 ProgressStatus 对象,该对象包含有关已执行操作的信息。progressState 成员包含标识当前操作的 ProgressState 枚举的一个成员。此成员可以包含 progressInfo 成员中的其他信息。此外,您从 reportStart() 返回的任何对象都会传递给 reportEnd(),以便您能够提供上下文信息,例如事件开始处理时的时间。

下面是一个实施示例,该示例提供了有关完成操作所需时间的信息:

public class SampleProgressReporter implements ProgressReporter { private static final Log logger = LogFactory.getLog(DefaultProgressReporter.class); @Override public Object reportStart(ProgressStatus status) { return new Date(); } @Override public void reportEnd(ProgressStatus status, Object startDate) { System.out.println(status.getProgressState().toString() + " is " + status.getProgressInfo().isSuccess() + " , and latency is " + Math.abs(((Date) startDate).getTime()-new Date().getTime()) + " milliseconds."); } }

如果您未实施自己的 ProgressReporter,则将改用 DefaultExceptionHandler (它会输出正在运行的状态的名称)。

处理错误

您可使用 ExceptionHandler 接口在日志处理期间发生异常时提供特殊处理。ExceptionHandler 声明单个调用方法 handleException(),该方法将接收一个带有关于已发生异常的上下文的 ProcessingLibraryException 对象。

您可以使用传入的 ProcessingLibraryExceptiongetStatus() 方法来查明发生异常时所执行的操作,并获取有关该操作的状态的附加信息。ProcessingLibraryException 派生自 Java 的标准 Exception 类,因此您也可以通过调用任一 Exception 方法来检索有关异常的信息。

请参阅下面的实施示例:

public class SampleExceptionHandler implements ExceptionHandler{ private static final Log logger = LogFactory.getLog(DefaultProgressReporter.class); @Override public void handleException(ProcessingLibraryException exception) { ProgressStatus status = exception.getStatus(); ProgressState state = status.getProgressState(); ProgressInfo info = status.getProgressInfo(); System.err.println(String.format( "Exception. Progress State: %s. Progress Information: %s.", state, info)); } }

如果您未提供自己的 ExceptionHandler,则改用 DefaultExceptionHandler (它会输出标准错误消息)。

注意

如果deleteMessageUponFailure参数为true,则 CloudTrail 处理库不区分一般异常和处理错误,并且可能会删除队列消息。

  1. 例如,使用 SourceFilter 按时间戳筛选消息。

  2. 但是,您没有访问接收 CloudTrail 日志文件的 S3 存储桶所需的权限。由于您没有所需的权限,因此会引发 AmazonServiceException。 CloudTrail 处理库将其封装在CallBackException.

  3. DefaultExceptionHandler 会将其记录为错误,但不会确定根本原因,这个原因就是您没有所需的权限。 CloudTrail 处理库将此视为处理错误并删除该消息,即使该消息包含有效的 CloudTrail 日志文件也是如此。

如果您要用 SourceFilter 来筛选消息,请验证您的 ExceptionHandler 是否可将服务异常与处理错误区分开。

其他 资源

有关 CloudTrail 处理库的更多信息,请参阅以下内容: