使用 Step Functions 运行 Athena 查询 - AWS Step Functions

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

使用 Step Functions 运行 Athena 查询

你可以整合 AWS Step Functions 使用 Amazon Athena 启动和停止查询执行并使用 Step Functions 获取查询结果。使用 Step Functions,您可以运行临时或计划数据查询,并检索针对 S3 数据湖的结果。Athena 没有服务器,因此您无需设置或管理任何基础设施,且只需为您运行的查询付费。本页列出了支持的 A APIs thena,并提供了启动 Athena Task 查询的示例状态。

要了解如何与集成 AWS Step Functions 中的服务,参见集成 服务和。在 Step Functions API 中向服务传递参数

优化 Athena 集成的主要功能

要整合 AWS Step Functions 对于亚马逊 Athena,您可以使用所提供的 Athena 服务集成。APIs

服务集成与相应APIs的 Athena APIs 相同。并非所有集成模式都APIs支持所有集成模式,如下表所示。

API 请求响应 运行作业 (.sync)
StartQueryExecution 支持 支持
StopQueryExecution 支持 不支持
GetQueryExecution 支持 不支持
GetQueryResults 支持 不支持

下面包含一个启动 Athena 查询作业的 Task 状态。

"Start an Athena query": { "Type": "Task", "Resource": "arn:aws:states:::athena:startQueryExecution.sync", "Parameters": { "QueryString": "SELECT * FROM \"myDatabase\".\"myTable\" limit 1", "WorkGroup": "primary", "ResultConfiguration": { "OutputLocation": "s3://amzn-s3-demo-bucket" } }, "Next": "Get results of the query" }

支持的亚马逊 AthenaAPIs:

注意

在 Step Functions 中,任务的最大输入或结果数据大小有一个配额。这样,当您向其他服务发送数据或从其他服务接收数据时,您只能使用 256 KB 的数据作为 UTF -8 编码的字符串。请参阅 与状态机执行相关的配额

IAM致电亚马逊 Athena 的政策

以下示例模板演示了如何操作 AWS Step Functions 根据状态机定义中的资源生成IAM策略。有关更多信息,请参阅Step Functions 如何为集成服务生成IAM策略在 Step Functions 中探索服务集成模式

StartQueryExecution

静态资源

Run a Job (.sync)
{ "Version": "2012-10-17", "Statement":[ { "Effect": "Allow", "Action": [ "athena:startQueryExecution", "athena:stopQueryExecution", "athena:getQueryExecution", "athena:getDataCatalog" ], "Resource": [ "arn:aws:athena:{{region}}:{{accountId}}:workgroup/[[workGroup]]", "arn:aws:athena:{{region}}:{{accountId}}:datacatalog/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload", "s3:CreateBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::*" ] }, { "Effect": "Allow", "Action": [ "glue:CreateDatabase", "glue:GetDatabase", "glue:GetDatabases", "glue:UpdateDatabase", "glue:DeleteDatabase", "glue:CreateTable", "glue:UpdateTable", "glue:GetTable", "glue:GetTables", "glue:DeleteTable", "glue:BatchDeleteTable", "glue:BatchCreatePartition", "glue:CreatePartition", "glue:UpdatePartition", "glue:GetPartition", "glue:GetPartitions", "glue:BatchGetPartition", "glue:DeletePartition", "glue:BatchDeletePartition" ], "Resource": [ "arn:aws:glue:{{region}}:{{accountId}}:catalog", "arn:aws:glue:{{region}}:{{accountId}}:database/*", "arn:aws:glue:{{region}}:{{accountId}}:table/*", "arn:aws:glue:{{region}}:{{accountId}}:userDefinedFunction/*" ] }, { "Effect": "Allow", "Action": [ "lakeformation:GetDataAccess" ], "Resource": [ "*" ] } ] }
Request Response
{ "Version": "2012-10-17", "Statement":[ { "Effect": "Allow", "Action": [ "athena:startQueryExecution", "athena:getDataCatalog" ], "Resource": [ "arn:aws:athena:{{region}}:{{accountId}}:workgroup/[[workGroup]]", "arn:aws:athena:{{region}}:{{accountId}}:datacatalog/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload", "s3:CreateBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::*" ] }, { "Effect": "Allow", "Action": [ "glue:CreateDatabase", "glue:GetDatabase", "glue:GetDatabases", "glue:UpdateDatabase", "glue:DeleteDatabase", "glue:CreateTable", "glue:UpdateTable", "glue:GetTable", "glue:GetTables", "glue:DeleteTable", "glue:BatchDeleteTable", "glue:BatchCreatePartition", "glue:CreatePartition", "glue:UpdatePartition", "glue:GetPartition", "glue:GetPartitions", "glue:BatchGetPartition", "glue:DeletePartition", "glue:BatchDeletePartition" ], "Resource": [ "arn:aws:glue:{{region}}:{{accountId}}:catalog", "arn:aws:glue:{{region}}:{{accountId}}:database/*", "arn:aws:glue:{{region}}:{{accountId}}:table/*", "arn:aws:glue:{{region}}:{{accountId}}:userDefinedFunction/*" ] }, { "Effect": "Allow", "Action": [ "lakeformation:GetDataAccess" ], "Resource": [ "*" ] } ] }

动态资源

Run a Job (.sync)
{ "Version": "2012-10-17", "Statement":[ { "Effect": "Allow", "Action": [ "athena:startQueryExecution", "athena:stopQueryExecution", "athena:getQueryExecution", "athena:getDataCatalog" ], "Resource": [ "arn:aws:athena:{{region}}:{{accountId}}:workgroup/*", "arn:aws:athena:{{region}}:{{accountId}}:datacatalog/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload", "s3:CreateBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::*" ] }, { "Effect": "Allow", "Action": [ "glue:CreateDatabase", "glue:GetDatabase", "glue:GetDatabases", "glue:UpdateDatabase", "glue:DeleteDatabase", "glue:CreateTable", "glue:UpdateTable", "glue:GetTable", "glue:GetTables", "glue:DeleteTable", "glue:BatchDeleteTable", "glue:BatchCreatePartition", "glue:CreatePartition", "glue:UpdatePartition", "glue:GetPartition", "glue:GetPartitions", "glue:BatchGetPartition", "glue:DeletePartition", "glue:BatchDeletePartition" ], "Resource": [ "arn:aws:glue:{{region}}:{{accountId}}:catalog", "arn:aws:glue:{{region}}:{{accountId}}:database/*", "arn:aws:glue:{{region}}:{{accountId}}:table/*", "arn:aws:glue:{{region}}:{{accountId}}:userDefinedFunction/*" ] }, { "Effect": "Allow", "Action": [ "lakeformation:GetDataAccess" ], "Resource": [ "*" ] } ] }
Request Response
{ "Version": "2012-10-17", "Statement":[ { "Effect": "Allow", "Action": [ "athena:startQueryExecution", "athena:getDataCatalog" ], "Resource": [ "arn:aws:athena:{{region}}:{{accountId}}:workgroup/*", "arn:aws:athena:{{region}}:{{accountId}}:datacatalog/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetBucketLocation", "s3:GetObject", "s3:ListBucket", "s3:ListBucketMultipartUploads", "s3:ListMultipartUploadParts", "s3:AbortMultipartUpload", "s3:CreateBucket", "s3:PutObject" ], "Resource": [ "arn:aws:s3:::*" ] }, { "Effect": "Allow", "Action": [ "glue:CreateDatabase", "glue:GetDatabase", "glue:GetDatabases", "glue:UpdateDatabase", "glue:DeleteDatabase", "glue:CreateTable", "glue:UpdateTable", "glue:GetTable", "glue:GetTables", "glue:DeleteTable", "glue:BatchDeleteTable", "glue:BatchCreatePartition", "glue:CreatePartition", "glue:UpdatePartition", "glue:GetPartition", "glue:GetPartitions", "glue:BatchGetPartition", "glue:DeletePartition", "glue:BatchDeletePartition" ], "Resource": [ "arn:aws:glue:{{region}}:{{accountId}}:catalog", "arn:aws:glue:{{region}}:{{accountId}}:database/*", "arn:aws:glue:{{region}}:{{accountId}}:table/*", "arn:aws:glue:{{region}}:{{accountId}}:userDefinedFunction/*" ] }, { "Effect": "Allow", "Action": [ "lakeformation:GetDataAccess" ], "Resource": [ "*" ] } ] }

StopQueryExecution

资源

{ "Version": "2012-10-17", "Statement":[ { "Effect": "Allow", "Action": [ "athena:stopQueryExecution" ], "Resource": [ "arn:aws:athena:{{region}}:{{accountId}}:workgroup/*" ] } ] }

GetQueryExecution

资源

{ "Version": "2012-10-17", "Statement":[ { "Effect": "Allow", "Action": [ "athena:getQueryExecution" ], "Resource": [ "arn:aws:athena:{{region}}:{{accountId}}:workgroup/*" ] } ] }

GetQueryResults

资源

{ "Version": "2012-10-17", "Statement":[ { "Effect": "Allow", "Action": [ "athena:getQueryResults" ], "Resource": [ "arn:aws:athena:{{region}}:{{accountId}}:workgroup/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::*" ] } ] }