本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
AWS AppSync 支援在單一區域中的一或多個資料表使用 Amazon DynamoDB 批次操作。支援的操作包括 BatchGetItem
、BatchPutItem
和 BatchDeleteItem
。透過在 中使用這些功能 AWS AppSync,您可以執行下列任務:
-
在單一查詢中傳遞金鑰清單,並從資料表傳回結果
-
從單一查詢中的一或多個資料表讀取記錄
-
將大量記錄寫入一或多個資料表
-
有條件在可能有關係的多個資料表中寫入或刪除記錄
中的批次操作與非批次操作 AWS AppSync 有兩個主要差異:
-
資料來源角色必須具有解析器將存取的所有資料表的許可。
-
解析器的資料表規格是請求物件的一部分。
單一資料表批次
警告
BatchPutItem
與衝突偵測和解決方案搭配使用時,BatchDeleteItem
不支援 和 。必須停用這些設定,以防止可能的錯誤。
若要開始使用,讓我們建立新的 GraphQL API。在 AWS AppSync 主控台中,選擇從頭開始建立 API、GraphQL APIs和設計 。 為您的 API 命名BatchTutorial API
,選擇下一步 ,然後在指定 GraphQL 資源步驟上,選擇稍後建立 GraphQL 資源,然後按一下下一步 。檢閱您的詳細資訊並建立 API。前往結構描述頁面並貼上下列結構描述,請注意,對於查詢,我們將傳遞至 清單IDs:
type Post {
id: ID!
title: String
}
input PostInput {
id: ID!
title: String
}
type Query {
batchGet(ids: [ID]): [Post]
}
type Mutation {
batchAdd(posts: [PostInput]): [Post]
batchDelete(ids: [ID]): [Post]
}
儲存您的結構描述,然後選擇頁面頂端的建立資源。選擇使用現有類型,然後選擇Post
類型。為您的資料表命名 Posts
。確定主金鑰設定為 id
,取消選取自動產生 GraphQL (您會提供自己的程式碼),然後選取建立 。若要開始使用, AWS AppSync 會建立新的 DynamoDB 資料表,以及連接至具有適當角色之資料表的資料來源。不過,您仍然需要將一些許可新增至角色。前往資料來源頁面,然後選擇新的資料來源。在選取現有角色 下,您會注意到已自動為資料表建立角色。記下角色 (看起來應該類似 appsync-ds-ddb-aaabbbcccddd-Posts
),然後前往IAM主控台 (https://console.aws.amazon.com/iam/+
」 (名稱應與角色名稱類似)。在政策出現時,選擇可摺疊頂端的編輯。您需要將批次許可新增至政策,特別是 dynamodb:BatchGetItem
和 dynamodb:BatchWriteItem
。看起來會像這樣:
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "dynamodb:DeleteItem", "dynamodb:GetItem", "dynamodb:PutItem", "dynamodb:Query", "dynamodb:Scan", "dynamodb:UpdateItem", "dynamodb:BatchWriteItem", "dynamodb:BatchGetItem" ], "Resource": [ "arn:aws:dynamodb:…", "arn:aws:dynamodb:…" ] } ] }
選擇下一個 ,然後儲存變更 。您的政策現在應該允許批次處理。
返回 AWS AppSync 主控台,前往結構描述頁面,然後選取Mutation.batchAdd
欄位旁的連接。使用Posts
資料表作為資料來源建立解析器。在程式碼編輯器中,將處理常式取代為下方的程式碼片段。此程式碼片段會自動取得 GraphQL input PostInput
類型中的每個項目,並建置 BatchPutItem
操作所需的映射:
import { util } from "@aws-appsync/utils";
export function request(ctx) {
return {
operation: "BatchPutItem",
tables: {
Posts: ctx.args.posts.map((post) => util.dynamodb.toMapValues(post)),
},
};
}
export function response(ctx) {
if (ctx.error) {
util.error(ctx.error.message, ctx.error.type);
}
return ctx.result.data.Posts;
}
導覽至 AWS AppSync 主控台的查詢頁面,並執行下列batchAdd
突變:
mutation add {
batchAdd(posts:[{
id: 1 title: "Running in the Park"},{
id: 2 title: "Playing fetch"
}]){
id
title
}
}
您應該會在畫面上看到列印的結果;這可以透過檢閱 DynamoDB 主控台來掃描寫入Posts
資料表的值來驗證。
接下來,重複連接解析器的程序,但使用Posts
資料表作為資料來源來連接 Query.batchGet
欄位。使用下列程式碼取代處理常式。這會自動擷取 GraphQL ids:[]
類型的每個項目,並建置 BatchGetItem
作業所需的對應圖:
import { util } from "@aws-appsync/utils";
export function request(ctx) {
return {
operation: "BatchGetItem",
tables: {
Posts: {
keys: ctx.args.ids.map((id) => util.dynamodb.toMapValues({ id })),
consistentRead: true,
},
},
};
}
export function response(ctx) {
if (ctx.error) {
util.error(ctx.error.message, ctx.error.type);
}
return ctx.result.data.Posts;
}
現在,返回 AWS AppSync 主控台的查詢頁面,並執行下列batchGet
查詢:
query get {
batchGet(ids:[1,2,3]){
id
title
}
}
這應該會針對您先前所新增的兩個 id
值,傳回其結果。請注意, 傳回null
的值id
值為 3
。這是因為Posts
資料表中尚無具有該值的記錄。另請注意, 會以與傳遞給查詢的金鑰相同的順序 AWS AppSync 傳回結果,這是代表您 AWS AppSync 執行的額外功能。因此,如果您切換到 batchGet(ids:[1,3,2])
,您會看到訂單已變更。您也會知道哪個 id
傳回 null
值。
最後,使用Posts
資料表作為資料來源,將另一個解析器連接到 Mutation.batchDelete
欄位。使用下列程式碼取代處理常式。這會自動擷取 GraphQL ids:[]
類型的每個項目,並建置 BatchGetItem
作業所需的對應圖:
import { util } from "@aws-appsync/utils";
export function request(ctx) {
return {
operation: "BatchDeleteItem",
tables: {
Posts: ctx.args.ids.map((id) => util.dynamodb.toMapValues({ id })),
},
};
}
export function response(ctx) {
if (ctx.error) {
util.error(ctx.error.message, ctx.error.type);
}
return ctx.result.data.Posts;
}
現在,返回 AWS AppSync 主控台的查詢頁面,並執行下列batchDelete
突變:
mutation delete {
batchDelete(ids:[1,2]){ id }
}
id
1
和 2
的記錄現在應已刪除。如果重新執行先前的 batchGet()
查詢,這些應該會傳回 null
。
多資料表批次
警告
BatchPutItem
與衝突偵測和解決方案搭配使用時,BatchDeleteItem
不支援 和 。必須停用這些設定,以防止可能的錯誤。
AWS AppSync 也可讓您跨資料表執行批次操作。來試試建置更複雜的應用程式。假設我們正在建置寵物運作狀態應用程式,其中感應器會報告寵物的位置和體溫。感應器是由電池供電,而且每隔幾分鐘就會試著連線到網路。當感應器建立連線時,它會將其讀數傳送到我們的 AWS AppSync API。觸發條件接著就會分析資料,然後將儀表板呈現給寵物的主人。讓我們著重介紹感應器與後端資料存放區之間的互動。
在 AWS AppSync 主控台中,選擇從頭開始建立 API、GraphQL APIs和設計 。 為您的 API 命名MultiBatchTutorial API
,選擇下一步 ,然後在指定 GraphQL 資源步驟上,選擇稍後建立 GraphQL 資源,然後按一下下一步 。檢閱您的詳細資訊並建立 API。前往結構描述頁面,貼上並儲存下列結構描述:
type Mutation {
# Register a batch of readings
recordReadings(tempReadings: [TemperatureReadingInput], locReadings: [LocationReadingInput]): RecordResult
# Delete a batch of readings
deleteReadings(tempReadings: [TemperatureReadingInput], locReadings: [LocationReadingInput]): RecordResult
}
type Query {
# Retrieve all possible readings recorded by a sensor at a specific time
getReadings(sensorId: ID!, timestamp: String!): [SensorReading]
}
type RecordResult {
temperatureReadings: [TemperatureReading]
locationReadings: [LocationReading]
}
interface SensorReading {
sensorId: ID!
timestamp: String!
}
# Sensor reading representing the sensor temperature (in Fahrenheit)
type TemperatureReading implements SensorReading {
sensorId: ID!
timestamp: String!
value: Float
}
# Sensor reading representing the sensor location (lat,long)
type LocationReading implements SensorReading {
sensorId: ID!
timestamp: String!
lat: Float
long: Float
}
input TemperatureReadingInput {
sensorId: ID!
timestamp: String
value: Float
}
input LocationReadingInput {
sensorId: ID!
timestamp: String
lat: Float
long: Float
}
我們需要建立兩個 DynamoDB 資料表:
-
locationReadings
將儲存感應器位置讀數。 -
temperatureReadings
將存放感應器溫度讀數。
兩個資料表將共用相同的主金鑰結構: sensorId (String)
與分割區金鑰和排序金鑰timestamp (String)
相同。
選擇頁面頂端的建立資源。選擇使用現有類型,然後選擇locationReadings
類型。為您的資料表命名 locationReadings
。確定主金鑰設定為 sensorId
,排序金鑰設定為 timestamp
。取消選取自動產生 GraphQL (您將提供自己的程式碼),然後選取建立 。temperatureReadings
使用 temperatureReadings
作為類型和資料表名稱,重複此程序。使用與上述相同的金鑰。
您的新資料表將包含自動產生的角色。您仍然需要將一些許可新增至這些角色。前往資料來源頁面,然後選擇 locationReadings
。在選取現有角色 下,您可以看到角色。記下角色 (看起來應該類似 appsync-ds-ddb-aaabbbcccddd-locationReadings
),然後前往IAM主控台 (https://console.aws.amazon.com/iam/+
」 (名稱應與角色名稱類似)。在政策出現時,選擇可摺疊頂端的編輯。您需要將許可新增至此政策。看起來會像這樣:
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"dynamodb:DeleteItem",
"dynamodb:GetItem",
"dynamodb:PutItem",
"dynamodb:Query",
"dynamodb:Scan",
"dynamodb:UpdateItem",
"dynamodb:BatchGetItem",
"dynamodb:BatchWriteItem"
],
"Resource": [
"arn:aws:dynamodb:region:account:table/locationReadings",
"arn:aws:dynamodb:region:account:table/locationReadings/*",
"arn:aws:dynamodb:region:account:table/temperatureReadings",
"arn:aws:dynamodb:region:account:table/temperatureReadings/*"
]
}
]
}
選擇下一個 ,然後儲存變更 。使用上述相同的政策程式碼片段為temperatureReadings
資料來源重複此程序。
BatchPutItem - 記錄感測器讀數
感應器必須能夠在連線到網際網路之後傳送其讀數。GraphQL 欄位Mutation.recordReadings
是API他們將用來執行此操作的 欄位。我們需要將解析器新增至此欄位。
在 AWS AppSync 主控台的結構描述頁面中,選取Mutation.recordReadings
欄位旁的連接。在下一個畫面上,使用locationReadings
資料表作為資料來源來建立解析器。
建立解析器後,請在編輯器中將處理常式取代為下列程式碼。BatchPutItem
此操作允許我們指定多個資料表:
import { util } from '@aws-appsync/utils'
export function request(ctx) {
const { locReadings, tempReadings } = ctx.args
const locationReadings = locReadings.map((loc) => util.dynamodb.toMapValues(loc))
const temperatureReadings = tempReadings.map((tmp) => util.dynamodb.toMapValues(tmp))
return {
operation: 'BatchPutItem',
tables: {
locationReadings,
temperatureReadings,
},
}
}
export function response(ctx) {
if (ctx.error) {
util.appendError(ctx.error.message, ctx.error.type)
}
return ctx.result.data
}
進行批次操作時,呼叫可能會同時傳回錯誤和結果。在這種情況中,我們可以隨意進行一些額外的錯誤處理。
注意
使用 utils.appendError()
類似於 util.error()
,主要區別在於它不會中斷請求或回應處理常式的評估。相反地,它表示 欄位發生錯誤,但允許評估處理常式,並將資料傳回給呼叫者。當您的應用程式需要傳回部分結果utils.appendError()
時,建議您使用 。
儲存解析器並導覽至 AWS AppSync 主控台中的查詢頁面。現在可以傳送一些感應器讀數。
執行下列的變動:
mutation sendReadings {
recordReadings(
tempReadings: [
{sensorId: 1, value: 85.5, timestamp: "2018-02-01T17:21:05.000+08:00"},
{sensorId: 1, value: 85.7, timestamp: "2018-02-01T17:21:06.000+08:00"},
{sensorId: 1, value: 85.8, timestamp: "2018-02-01T17:21:07.000+08:00"},
{sensorId: 1, value: 84.2, timestamp: "2018-02-01T17:21:08.000+08:00"},
{sensorId: 1, value: 81.5, timestamp: "2018-02-01T17:21:09.000+08:00"}
]
locReadings: [
{sensorId: 1, lat: 47.615063, long: -122.333551, timestamp: "2018-02-01T17:21:05.000+08:00"},
{sensorId: 1, lat: 47.615163, long: -122.333552, timestamp: "2018-02-01T17:21:06.000+08:00"},
{sensorId: 1, lat: 47.615263, long: -122.333553, timestamp: "2018-02-01T17:21:07.000+08:00"},
{sensorId: 1, lat: 47.615363, long: -122.333554, timestamp: "2018-02-01T17:21:08.000+08:00"},
{sensorId: 1, lat: 47.615463, long: -122.333555, timestamp: "2018-02-01T17:21:09.000+08:00"}
]) {
locationReadings {
sensorId
timestamp
lat
long
}
temperatureReadings {
sensorId
timestamp
value
}
}
}
我們在一個突變中傳送了十個感應器讀數,讀數分為兩個資料表。使用 DynamoDB 主控台來驗證資料是否同時顯示在 locationReadings
和 temperatureReadings
資料表中。
BatchDeleteItem - 刪除感應器讀數
同樣地,我們也需要能夠刪除感應器讀數的批次。讓我們使用 Mutation.deleteReadings
GraphQL 欄位來進行這項動作。在 AWS AppSync 主控台的結構描述頁面中,選取Mutation.deleteReadings
欄位旁的連接。在下一個畫面上,使用locationReadings
資料表作為資料來源來建立您的解析器。
建立解析器後,請將程式碼編輯器中的處理常式取代為下方的程式碼片段。在此解析程式中,我們使用協助程式函數映射器,從提供的輸入timestamp
中擷取 sensorId
和 。
import { util } from '@aws-appsync/utils'
export function request(ctx) {
const { locReadings, tempReadings } = ctx.args
const mapper = ({ sensorId, timestamp }) => util.dynamodb.toMapValues({ sensorId, timestamp })
return {
operation: 'BatchDeleteItem',
tables: {
locationReadings: locReadings.map(mapper),
temperatureReadings: tempReadings.map(mapper),
},
}
}
export function response(ctx) {
if (ctx.error) {
util.appendError(ctx.error.message, ctx.error.type)
}
return ctx.result.data
}
儲存解析程式並導覽至 AWS AppSync 主控台中的查詢頁面。現在,讓我們刪除幾個感應器讀數。
執行下列的變動:
mutation deleteReadings {
# Let's delete the first two readings we recorded
deleteReadings(
tempReadings: [{sensorId: 1, timestamp: "2018-02-01T17:21:05.000+08:00"}]
locReadings: [{sensorId: 1, timestamp: "2018-02-01T17:21:05.000+08:00"}]) {
locationReadings {
sensorId
timestamp
lat
long
}
temperatureReadings {
sensorId
timestamp
value
}
}
}
注意
不同於 DeleteItem
操作,回應中未傳回完整刪除的項目。只會傳回已傳遞的索引鍵。若要進一步了解,請參閱 BatchDeleteItem DynamoDB 的 JavaScript 解析器函數參考。
透過 DynamoDB 主控台驗證這兩個讀數是否已從 locationReadings
和 temperatureReadings
資料表中刪除。
BatchGetItem - 擷取讀數
我們應用程式的另一個常見操作是擷取特定時間點的感應器讀數。試試將解析程式連接到結構描述中的 Query.getReadings
GraphQL 欄位。在 AWS AppSync 主控台的結構描述頁面中,選取Query.getReadings
欄位旁的連接。在下一個畫面上,使用locationReadings
資料表作為資料來源來建立您的解析器。
讓我們使用下列程式碼:
import { util } from '@aws-appsync/utils'
export function request(ctx) {
const keys = [util.dynamodb.toMapValues(ctx.args)]
const consistentRead = true
return {
operation: 'BatchGetItem',
tables: {
locationReadings: { keys, consistentRead },
temperatureReadings: { keys, consistentRead },
},
}
}
export function response(ctx) {
if (ctx.error) {
util.appendError(ctx.error.message, ctx.error.type)
}
const { locationReadings: locs, temperatureReadings: temps } = ctx.result.data
return [
...locs.map((l) => ({ ...l, __typename: 'LocationReading' })),
...temps.map((t) => ({ ...t, __typename: 'TemperatureReading' })),
]
}
儲存解析器並導覽至 AWS AppSync 主控台中的查詢頁面。現在,讓我們擷取感測器讀數。
執行下列的查詢:
query getReadingsForSensorAndTime {
# Let's retrieve the very first two readings
getReadings(sensorId: 1, timestamp: "2018-02-01T17:21:06.000+08:00") {
sensorId
timestamp
...on TemperatureReading {
value
}
...on LocationReading {
lat
long
}
}
}
我們已成功使用 示範 DynamoDB 批次操作的使用 AWS AppSync。
錯誤處理
在 中 AWS AppSync,資料來源操作有時可能會傳回部分結果。我們將會使用部分結果這個詞彙,來表示操作的輸出包含一些資料和錯誤的情況。由於錯誤處理本質上是應用程式特定的, AWS AppSync 因此可讓您有機會處理回應處理常式中的錯誤。如果發生解析程式呼叫錯誤,在文字內容中會出現 ctx.error
。呼叫錯誤一律包含訊息和類型,可做為 ctx.error.message
和 ctx.error.type
屬性存取。在回應處理常式中,您可以透過三種方式處理部分結果:
-
只傳回資料,以取代叫用錯誤。
-
透過停止處理常式評估來提出錯誤 (使用
util.error(...)
),這不會傳回任何資料。 -
附加錯誤 (使用
util.appendError(...)
) 並傳回資料。
讓我們使用 DynamoDB 批次操作來示範上述三個重點。
DynamoDB 批次操作
使用 DynamoDB 批次操作時,批次作業就有可能部分完成。也就是說,請求的項目或索引鍵可以有一些尚未處理完成。如果 AWS AppSync 無法完成批次,則將在內容中設定未處理的項目和調用錯誤。
我們將會使用 Query.getReadings
欄位組態 (取自於本教學課程先前區段所說明的 BatchGetItem
操作) 來建置錯誤處理功能。這次,讓我們假設在執行 Query.getReadings
欄位時,temperatureReadings
DynamoDB 資料表用盡了佈建的輸送量。DynamoDB 在第二次嘗試ProvisionedThroughputExceededException
期間透過 來處理批次中的其餘元素 AWS AppSync ,進而引發 。
下列JSON代表 DynamoDB 批次調用之後,但在呼叫回應處理常式之前序列化的內容:
{
"arguments": {
"sensorId": "1",
"timestamp": "2018-02-01T17:21:05.000+08:00"
},
"source": null,
"result": {
"data": {
"temperatureReadings": [
null
],
"locationReadings": [
{
"lat": 47.615063,
"long": -122.333551,
"sensorId": "1",
"timestamp": "2018-02-01T17:21:05.000+08:00"
}
]
},
"unprocessedKeys": {
"temperatureReadings": [
{
"sensorId": "1",
"timestamp": "2018-02-01T17:21:05.000+08:00"
}
],
"locationReadings": []
}
},
"error": {
"type": "DynamoDB:ProvisionedThroughputExceededException",
"message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)"
},
"outErrors": []
}
對於內容的幾個注意事項:
-
已在 的內容上設定叫用錯誤
ctx.error
AWS AppSync,且錯誤類型已設定為DynamoDB:ProvisionedThroughputExceededException
。 -
ctx.result.data
即使存在錯誤,結果仍會在 下對應每個資料表。 -
未處理的金鑰可在 取得
ctx.result.data.unprocessedKeys
。在這裡, AWS AppSync 由於資料表輸送量不足,無法擷取具有金鑰的項目 (sensorId:1,時間戳記:2018-02-01T17:21:05.000+08:00)。
注意
如果是 BatchPutItem
,則為 ctx.result.data.unprocessedItems
。如果是 BatchDeleteItem
,則為 ctx.result.data.unprocessedKeys
。
讓我們用三種不同的方式來處理這項錯誤。
1. 抑制呼叫錯誤
傳回資料而不處理呼叫錯誤,可有效地抑制錯誤,讓指定 GraphQL 欄位的結果一律成功。
我們編寫的程式碼很熟悉,只著重於結果資料。
回應處理常式
export function response(ctx) {
return ctx.result.data
}
GraphQL 回應
{
"data": {
"getReadings": [
{
"sensorId": "1",
"timestamp": "2018-02-01T17:21:05.000+08:00",
"lat": 47.615063,
"long": -122.333551
},
{
"sensorId": "1",
"timestamp": "2018-02-01T17:21:05.000+08:00",
"value": 85.5
}
]
}
}
錯誤回應中不會加入任何錯誤,因為只處理了資料。
2. 提高錯誤以中止回應處理常式執行
當部分失敗應視為從用戶端的角度來看完全失敗時,您可以中止回應處理常式執行,以防止傳回資料。util.error(...)
公用程式方法可確實完成這項動作。
回應處理常式程式碼
export function response(ctx) {
if (ctx.error) {
util.error(ctx.error.message, ctx.error.type, null, ctx.result.data.unprocessedKeys);
}
return ctx.result.data;
}
GraphQL 回應
{
"data": {
"getReadings": null
},
"errors": [
{
"path": [
"getReadings"
],
"data": null,
"errorType": "DynamoDB:ProvisionedThroughputExceededException",
"errorInfo": {
"temperatureReadings": [
{
"sensorId": "1",
"timestamp": "2018-02-01T17:21:05.000+08:00"
}
],
"locationReadings": []
},
"locations": [
{
"line": 58,
"column": 3
}
],
"message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)"
}
]
}
即使 DynamoDB 批次作業可能已經傳回一些結果,我們還是選擇丟出錯誤,如此 getReadings
GraphQL 欄位為 null,而錯誤也會加入到 GraphQL 回應的 errors (錯誤) 區塊。
3. 附加錯誤,以同時傳回資料和錯誤
在某些情況中,為提供更好的使用者體驗,應用程式可以傳回部分結果,並通知其用戶端有未處理的項目。用戶端可以決定是否要重試,或是轉譯錯誤後傳回給最終使用者。util.appendError(...)
是公用程式方法,可讓應用程式設計工具在內容上附加錯誤,而不會干擾回應處理常式的評估,從而啟用此行為。評估回應處理常式後, AWS AppSync 將透過將任何內容錯誤附加至 GraphQL 回應的錯誤區塊來處理這些錯誤。
回應處理常式程式碼
export function response(ctx) {
if (ctx.error) {
util.appendError(ctx.error.message, ctx.error.type, null, ctx.result.data.unprocessedKeys);
}
return ctx.result.data;
}
我們轉送了 GraphQL 回應錯誤區塊內的調用錯誤和unprocessedKeys
元素。getReadings
欄位也會傳回locationReadings
資料表中的部分資料,如下方回應所示。
GraphQL 回應
{
"data": {
"getReadings": [
null,
{
"sensorId": "1",
"timestamp": "2018-02-01T17:21:05.000+08:00",
"value": 85.5
}
]
},
"errors": [
{
"path": [
"getReadings"
],
"data": null,
"errorType": "DynamoDB:ProvisionedThroughputExceededException",
"errorInfo": {
"temperatureReadings": [
{
"sensorId": "1",
"timestamp": "2018-02-01T17:21:05.000+08:00"
}
],
"locationReadings": []
},
"locations": [
{
"line": 58,
"column": 3
}
],
"message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)"
}
]
}