教學課程:Batch 解析器 - AWS AppSync

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

教學課程:Batch 解析器

注意

我們現在主要支援 APPSYNC_JS 執行階段及其說明文件。請考慮在此處使用 APPSYNC_JS 執行階段及其指南。

AWS AppSync 支援在單一區域中的一個或多個表格中使用 Amazon DynamoDB 批次操作。支援的操作包括 BatchGetItemBatchPutItemBatchDeleteItem。您可以在 AWS AppSync 中使用這些功能來執行工作,例如:

  • 在單次查詢中傳遞索引鍵清單,並從資料表傳回結果

  • 在單次查詢中讀取一個或多個資料表的記錄

  • 將大量的記錄寫入一個或多個資料表

  • 在可能具有關聯的多個資料表中,有條件地寫入或刪除記錄

在 DynamoDB 中使用批次作業AWS AppSync 是一項進階技術,需要對後端作業和表格結構的額外思考和知識。此外,在 AWS AppSync 中的批次操作和非批次操作有兩項主要的差異:

  • 對於解析程式將會存取的所有資料表,資料來源角色都必須具有權限。

  • 解析程式資料表規格包含於映射範本中。

許可

如同其他的解析程式,您需要在 AWS AppSync 中建立資料來源,然後建立角色或使用現有的角色。由於批次操作需要 DynamoDB 表上的不同權限,因此您需要授與已設定的角色權限以進行讀取或寫入動作:

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "dynamodb:BatchGetItem", "dynamodb:BatchWriteItem" ], "Effect": "Allow", "Resource": [ "arn:aws:dynamodb:region:account:table/TABLENAME", "arn:aws:dynamodb:region:account:table/TABLENAME/*" ] } ] }

注意:在 AWS AppSync 中,角色會連結至資料來源,而會針對資料來源叫用欄位上的解析程式。設定為針對 DynamoDB 擷取的資料來源只會指定一個表格,以保持組態簡單。因此,在單一解析程式中針對多個資料表執行批次作業時 (這是一項更為進階的工作),您必須授予權限給該資料來源上的角色,以存取解析程式將會與其互動的所有資料表。這會在上述 IAM 政策的 Resource (資源) 欄位中完成。針對批次操作所要呼叫的資料表進行設定,這項動作是在解析程式範本中完成,我們將會在下列內容中說明此範本。

資料來源

為簡單起見,我們將針對本教學課程中使用的所有解析程式,使用相同的資料來源。在資料來源索引標籤上,建立新 DynamoDB 資料來源並為其命名。BatchTutorial資料表可以使用任何名稱,因為資料表名稱在批次作業中是指定為請求映射範本的一部分。我們會將資料表命名為 empty

在本教學課程中,具備下列內嵌政策的任何角色皆可使用:

{ "Version": "2012-10-17", "Statement": [ { "Action": [ "dynamodb:BatchGetItem", "dynamodb:BatchWriteItem" ], "Effect": "Allow", "Resource": [ "arn:aws:dynamodb:region:account:table/Posts", "arn:aws:dynamodb:region:account:table/Posts/*", "arn:aws:dynamodb:region:account:table/locationReadings", "arn:aws:dynamodb:region:account:table/locationReadings/*", "arn:aws:dynamodb:region:account:table/temperatureReadings", "arn:aws:dynamodb:region:account:table/temperatureReadings/*" ] } ] }

單一資料表批次

例如,假設有一個名為 Posts 的資料表,您想要利用批次操作來新增和移除此資料表的項目。使用下列的結構描述,請注意,我們會將 ID 清單傳入查詢:

type Post { id: ID! title: String } input PostInput { id: ID! title: String } type Query { batchGet(ids: [ID]): [Post] } type Mutation { batchAdd(posts: [PostInput]): [Post] batchDelete(ids: [ID]): [Post] } schema { query: Query mutation: Mutation }

使用下列「請求對應範本」將解析器附加至batchAdd()欄位。這會自動擷取 GraphQL input PostInput 類型的每個項目,並建置 BatchPutItem 作業所需的對應圖:

#set($postsdata = []) #foreach($item in ${ctx.args.posts}) $util.qr($postsdata.add($util.dynamodb.toMapValues($item))) #end { "version" : "2018-05-29", "operation" : "BatchPutItem", "tables" : { "Posts": $utils.toJson($postsdata) } }

在此情況中的回應映射範本只是簡單的直接傳遞,但是該資料表名稱會以 ..data.Posts 的形式附加至內容物件,如下所示:

$util.toJson($ctx.result.data.Posts)

現在請瀏覽至 主控台的 QueriesAWS AppSync (查詢) 頁面,並執行下列的 batchAdd 變動動作:

mutation add { batchAdd(posts:[{ id: 1 title: "Running in the Park"},{ id: 2 title: "Playing fetch" }]){ id title } }

您應該會看到列印在畫面上的結果,並且可以透過 DynamoDB 主控台獨立驗證這兩個值都寫入貼文表。

接下來,使用下列「請求對應範本」將解析器附加至batchGet()欄位。這會自動擷取 GraphQL ids:[] 類型的每個項目,並建置 BatchGetItem 作業所需的對應圖:

#set($ids = []) #foreach($id in ${ctx.args.ids}) #set($map = {}) $util.qr($map.put("id", $util.dynamodb.toString($id))) $util.qr($ids.add($map)) #end { "version" : "2018-05-29", "operation" : "BatchGetItem", "tables" : { "Posts": { "keys": $util.toJson($ids), "consistentRead": true, "projection" : { "expression" : "#id, title", "expressionNames" : { "#id" : "id"} } } } }

回應映射範本仍然只是簡單的直接傳遞,該資料表名稱也會再次地以 ..data.Posts 的形式附加至內容物件:

$util.toJson($ctx.result.data.Posts)

現在請返回 主控台的 QueriesAWS AppSync (查詢) 頁面,並執行下列的 batchGet 查詢動作:

query get { batchGet(ids:[1,2,3]){ id title } }

這應該會針對您先前所新增的兩個 id 值,傳回其結果。請注意,值為 nullid 傳回了 3 值。這是因為在 Posts 資料表中,尚未有任何記錄具有該值。另請注意,AWS AppSync 會依索引鍵傳入查詢的相同順序傳回結果,這是 AWS AppSync 代表您執行的一項額外功能。因此,如果切換到 batchGet(ids:[1,3,2),順序將會變更。您也會知道哪個 id 傳回 null 值。

最後,使用以下「請求映射模板」將解析器附加到batchDelete()字段中。這會自動擷取 GraphQL ids:[] 類型的每個項目,並建置 BatchGetItem 作業所需的對應圖:

#set($ids = []) #foreach($id in ${ctx.args.ids}) #set($map = {}) $util.qr($map.put("id", $util.dynamodb.toString($id))) $util.qr($ids.add($map)) #end { "version" : "2018-05-29", "operation" : "BatchDeleteItem", "tables" : { "Posts": $util.toJson($ids) } }

回應映射範本仍然只是簡單的直接傳遞,該資料表名稱也會再次地以 ..data.Posts 的形式附加至內容物件:

$util.toJson($ctx.result.data.Posts)

現在請返回 主控台的 QueriesAWS AppSync (查詢) 頁面,並執行下列的 batchDelete 變動動作:

mutation delete { batchDelete(ids:[1,2]){ id } }

id 12 的記錄現在應已刪除。如果重新執行先前的 batchGet() 查詢,這些應該會傳回 null

多重資料表批次

AWS AppSync 還可讓您跨表執行批次作業。來試試建置更複雜的應用程式。試想建置一個寵物健康狀態 (Pet Health) 應用程式,其中會有感應器呈報寵物的位置和體溫。感應器是由電池供電,而且每隔幾分鐘就會試著連線到網路。當感應器建立連線時,會將其讀數傳送到我們的 AWS AppSync API。觸發條件接著就會分析資料,然後將儀表板呈現給寵物的主人。讓我們著重介紹感應器與後端資料存放區之間的互動。

先決條件是,讓我們先建立兩個 DynamoDB 表;locationReadings 會儲存感應器位置讀數,而 temperatureReadings 則會儲存感測器溫度讀數。這兩個資料表碰巧擁有相同的主索引鍵結構:sensorId (String) 為分割區索引鍵、timestamp (String) 為排序索引鍵。

讓我們使用下列 GraphQL 結構描述:

type Mutation { # Register a batch of readings recordReadings(tempReadings: [TemperatureReadingInput], locReadings: [LocationReadingInput]): RecordResult # Delete a batch of readings deleteReadings(tempReadings: [TemperatureReadingInput], locReadings: [LocationReadingInput]): RecordResult } type Query { # Retrieve all possible readings recorded by a sensor at a specific time getReadings(sensorId: ID!, timestamp: String!): [SensorReading] } type RecordResult { temperatureReadings: [TemperatureReading] locationReadings: [LocationReading] } interface SensorReading { sensorId: ID! timestamp: String! } # Sensor reading representing the sensor temperature (in Fahrenheit) type TemperatureReading implements SensorReading { sensorId: ID! timestamp: String! value: Float } # Sensor reading representing the sensor location (lat,long) type LocationReading implements SensorReading { sensorId: ID! timestamp: String! lat: Float long: Float } input TemperatureReadingInput { sensorId: ID! timestamp: String value: Float } input LocationReadingInput { sensorId: ID! timestamp: String lat: Float long: Float }

BatchPutItem -記錄傳感器讀數

感應器必須能夠在連線到網際網路之後傳送其讀數。感應器將使用 GraphQL 欄位 Mutation.recordReadings 這個 API 來執行此項動作。讓我們連結解析程式,讓 API 能夠開始運作。

選取欄位旁邊的「附加Mutation.recordReadings」。在下一個畫面中,選取在教學課程開始時所建立的同一個 BatchTutorial 資料來源。

讓我們來新增下列請求映射範本:

請求映射範本

## Convert tempReadings arguments to DynamoDB objects #set($tempReadings = []) #foreach($reading in ${ctx.args.tempReadings}) $util.qr($tempReadings.add($util.dynamodb.toMapValues($reading))) #end ## Convert locReadings arguments to DynamoDB objects #set($locReadings = []) #foreach($reading in ${ctx.args.locReadings}) $util.qr($locReadings.add($util.dynamodb.toMapValues($reading))) #end { "version" : "2018-05-29", "operation" : "BatchPutItem", "tables" : { "locationReadings": $utils.toJson($locReadings), "temperatureReadings": $utils.toJson($tempReadings) } }

如您所見,BatchPutItem 操作可讓我們指定多個資料表。

讓我們使用下列回應映射範本。

回應映射範本

## If there was an error with the invocation ## there might have been partial results #if($ctx.error) ## Append a GraphQL error for that field in the GraphQL response $utils.appendError($ctx.error.message, $ctx.error.message) #end ## Also returns data for the field in the GraphQL response $utils.toJson($ctx.result.data)

進行批次操作時,呼叫可能會同時傳回錯誤和結果。在這種情況中,我們可以隨意進行一些額外的錯誤處理。

注意$utils.appendError() 的使用類似於 $util.error(),主要差別在於前者不會中斷映射範本的評估,而是在欄位出現錯誤時發出訊號,但允許範本的評估,進而將資料傳回給呼叫程式。如果應用程式需要傳回部分結果,建議您使用 $utils.appendError()

儲存解析程式,然後瀏覽至 主控台的 QueriesAWS AppSync (查詢) 頁面。讓我們開始傳送一些感應器讀數吧!

執行下列的變動:

mutation sendReadings { recordReadings( tempReadings: [ {sensorId: 1, value: 85.5, timestamp: "2018-02-01T17:21:05.000+08:00"}, {sensorId: 1, value: 85.7, timestamp: "2018-02-01T17:21:06.000+08:00"}, {sensorId: 1, value: 85.8, timestamp: "2018-02-01T17:21:07.000+08:00"}, {sensorId: 1, value: 84.2, timestamp: "2018-02-01T17:21:08.000+08:00"}, {sensorId: 1, value: 81.5, timestamp: "2018-02-01T17:21:09.000+08:00"} ] locReadings: [ {sensorId: 1, lat: 47.615063, long: -122.333551, timestamp: "2018-02-01T17:21:05.000+08:00"}, {sensorId: 1, lat: 47.615163, long: -122.333552, timestamp: "2018-02-01T17:21:06.000+08:00"} {sensorId: 1, lat: 47.615263, long: -122.333553, timestamp: "2018-02-01T17:21:07.000+08:00"} {sensorId: 1, lat: 47.615363, long: -122.333554, timestamp: "2018-02-01T17:21:08.000+08:00"} {sensorId: 1, lat: 47.615463, long: -122.333555, timestamp: "2018-02-01T17:21:09.000+08:00"} ]) { locationReadings { sensorId timestamp lat long } temperatureReadings { sensorId timestamp value } } }

我們會在一次的變動中傳送 10 個感應器讀數,讀數的資料會分置於兩個資料表。使用 DynamoDB 主控台驗證資料是否同時顯示在「locationReadings」和「temperatureReadings」表中。

BatchDeleteItem -刪除傳感器讀數

同樣地,我們也需要刪除分批的感應器讀數。讓我們使用 Mutation.deleteReadings GraphQL 欄位來進行這項動作。選取欄位旁邊的「附加Mutation.recordReadings」。在下一個畫面中,選取在教學課程開始時所建立的同一個 BatchTutorial 資料來源。

讓我們使用下列請求映射範本。

請求映射範本

## Convert tempReadings arguments to DynamoDB primary keys #set($tempReadings = []) #foreach($reading in ${ctx.args.tempReadings}) #set($pkey = {}) $util.qr($pkey.put("sensorId", $reading.sensorId)) $util.qr($pkey.put("timestamp", $reading.timestamp)) $util.qr($tempReadings.add($util.dynamodb.toMapValues($pkey))) #end ## Convert locReadings arguments to DynamoDB primary keys #set($locReadings = []) #foreach($reading in ${ctx.args.locReadings}) #set($pkey = {}) $util.qr($pkey.put("sensorId", $reading.sensorId)) $util.qr($pkey.put("timestamp", $reading.timestamp)) $util.qr($locReadings.add($util.dynamodb.toMapValues($pkey))) #end { "version" : "2018-05-29", "operation" : "BatchDeleteItem", "tables" : { "locationReadings": $utils.toJson($locReadings), "temperatureReadings": $utils.toJson($tempReadings) } }

這個回應映射範本和我們用於 Mutation.recordReadings 的相同。

回應映射範本

## If there was an error with the invocation ## there might have been partial results #if($ctx.error) ## Append a GraphQL error for that field in the GraphQL response $utils.appendError($ctx.error.message, $ctx.error.message) #end ## Also return data for the field in the GraphQL response $utils.toJson($ctx.result.data)

儲存解析程式,然後瀏覽至 主控台的 QueriesAWS AppSync (查詢) 頁面。現在,讓我們刪除一些感應器讀數吧!

執行下列的變動:

mutation deleteReadings { # Let's delete the first two readings we recorded deleteReadings( tempReadings: [{sensorId: 1, timestamp: "2018-02-01T17:21:05.000+08:00"}] locReadings: [{sensorId: 1, timestamp: "2018-02-01T17:21:05.000+08:00"}]) { locationReadings { sensorId timestamp lat long } temperatureReadings { sensorId timestamp value } } }

透過 DynamoDB 主控台驗證這兩個讀數是否已從「locationReadings」和「temperatureReadings 讀數」表格中刪除。

BatchGetItem -檢索讀數

我們寵物健康狀態 (Pet Health) 應用程式另一個常用的操作,就是在特定的時間點擷取感應器的讀數。試試將解析程式連接到結構描述中的 Query.getReadings GraphQL 欄位。選取 Attach (附加),然後在下一個畫面上,選取在教學課程開始時所建立的同一個 BatchTutorial 資料來源。

讓我們來新增下列請求映射範本。

請求映射範本

## Build a single DynamoDB primary key, ## as both locationReadings and tempReadings tables ## share the same primary key structure #set($pkey = {}) $util.qr($pkey.put("sensorId", $ctx.args.sensorId)) $util.qr($pkey.put("timestamp", $ctx.args.timestamp)) { "version" : "2018-05-29", "operation" : "BatchGetItem", "tables" : { "locationReadings": { "keys": [$util.dynamodb.toMapValuesJson($pkey)], "consistentRead": true }, "temperatureReadings": { "keys": [$util.dynamodb.toMapValuesJson($pkey)], "consistentRead": true } } }

請注意,我們現在正在使用該BatchGetItem操作。

我們的回應映射範本將會有些微的不同,因為我們選擇傳回 SensorReading 清單。讓我們將叫用結果映射到所需的結構。

回應映射範本

## Merge locationReadings and temperatureReadings ## into a single list ## __typename needed as schema uses an interface #set($sensorReadings = []) #foreach($locReading in $ctx.result.data.locationReadings) $util.qr($locReading.put("__typename", "LocationReading")) $util.qr($sensorReadings.add($locReading)) #end #foreach($tempReading in $ctx.result.data.temperatureReadings) $util.qr($tempReading.put("__typename", "TemperatureReading")) $util.qr($sensorReadings.add($tempReading)) #end $util.toJson($sensorReadings)

儲存解析程式,然後瀏覽至 主控台的 QueriesAWS AppSync (查詢) 頁面。現在,讓我們擷取感應器讀數吧!

執行下列的查詢:

query getReadingsForSensorAndTime { # Let's retrieve the very first two readings getReadings(sensorId: 1, timestamp: "2018-02-01T17:21:06.000+08:00") { sensorId timestamp ...on TemperatureReading { value } ...on LocationReading { lat long } } }

我們已經成功示範了使用. AWS AppSync

錯誤處理

在 AWS AppSync 中,資料來源的操作有時可能會傳回部分結果。我們將會使用部分結果這個詞彙,來表示操作的輸出包含一些資料和錯誤的情況。因為錯誤處理原本就是隨應用程式而有不同,AWS AppSync 提供了機會,讓您在回應映射範本中處理錯誤。如果發生解析程式呼叫錯誤,在文字內容中會出現 $ctx.error。呼叫錯誤一律包含訊息和類型,可做為 $ctx.error.message$ctx.error.type 屬性存取。在回應映射範本呼叫期間,您可以用三種方式來處理部分結果:

  1. 藉由只傳回資料來抑制呼叫錯誤

  2. 藉由停止進行回應映射範本的評估 (這不會傳回任何資料),來產生錯誤 (使用 $util.error(...))。

  3. 附加錯誤 (使用 $util.appendError(...)),而且也傳回資料

讓我們透過 DynamoDB 的批次操作來示範上述三點!

DynamoDB 批次操作

使用 DynamoDB 批次操作時,批次作業就有可能部分完成。也就是說,請求的項目或索引鍵可以有一些尚未處理完成。如果 AWS AppSync 無法完成批次作業,在文字內容中將會顯示未處理的項目和呼叫錯誤。

我們將會使用 Query.getReadings 欄位組態 (取自於本教學課程先前區段所說明的 BatchGetItem 操作) 來建置錯誤處理功能。這次,讓我們假設在執行 Query.getReadings 欄位時,temperatureReadings DynamoDB 資料表用盡了佈建的輸送量。DynamoDB 在第二次嘗試處理批次中的AWS AppSync 剩餘元素ProvisionedThroughputExceededException時提出了一個。

下列的 JSON 呈現了在 DynamoDB 批次呼叫之後、回應映射範本評估之前的序列化內容。

{ "arguments": { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" }, "source": null, "result": { "data": { "temperatureReadings": [ null ], "locationReadings": [ { "lat": 47.615063, "long": -122.333551, "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ] }, "unprocessedKeys": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] } }, "error": { "type": "DynamoDB:ProvisionedThroughputExceededException", "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" }, "outErrors": [] }

對於內容的幾個注意事項:

  • 呼叫錯誤已在$ctx.error上下文中設定 AWS AppSync,且錯誤類型已設定為 DynamoDB:。ProvisionedThroughputExceededException

  • 即使出現錯誤,也會針對每個資料表對應結果 ($ctx.result.data)

  • 未處理的索引鍵可透過 $ctx.result.data.unprocessedKeys 取得。在這裡,AWS AppSync 無法利用索引鍵 (sensorId:1、時間戳記:2018-02-01T17:21:05.000+08:00) 來擷取項目,因為資料表輸送量不足。

注意:對於 BatchPutItem,則為 $ctx.result.data.unprocessedItems。如果是 BatchDeleteItem,則為 $ctx.result.data.unprocessedKeys

讓我們用三種不同的方式來處理這項錯誤。

1. 抑制呼叫錯誤

傳回資料而不處理呼叫錯誤,可有效地抑制錯誤,讓指定 GraphQL 欄位的結果一律成功。

我們所編寫的回應映射範本,只熟悉和著重於結果的資料。

回應映射範本:

$util.toJson($ctx.result.data)

GraphQL 回應:

{ "data": { "getReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "lat": 47.615063, "long": -122.333551 }, { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "value": 85.5 } ] } }

錯誤回應中不會加入任何錯誤,因為只處理了資料。

2. 丟出錯誤來中止範本的執行

如果從用戶端的角度,部分故障應視為完全故障,則您可以中止範本的執行,來防止傳回資料。$util.error(...) 公用程式方法可確實完成這項動作。

回應映射範本:

## there was an error let's mark the entire field ## as failed and do not return any data back in the response #if ($ctx.error) $util.error($ctx.error.message, $ctx.error.type, null, $ctx.result.data.unprocessedKeys) #end $util.toJson($ctx.result.data)

GraphQL 回應:

{ "data": { "getReadings": null }, "errors": [ { "path": [ "getReadings" ], "data": null, "errorType": "DynamoDB:ProvisionedThroughputExceededException", "errorInfo": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] }, "locations": [ { "line": 58, "column": 3 } ], "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" } ] }

即使 DynamoDB 批次作業可能已經傳回一些結果,我們還是選擇丟出錯誤,如此 getReadings GraphQL 欄位為 null,而錯誤也會加入到 GraphQL 回應的 errors (錯誤) 區塊。

3. 附加錯誤,以同時傳回資料和錯誤

在某些情況中,為提供更好的使用者體驗,應用程式可以傳回部分結果,並通知其用戶端有未處理的項目。用戶端可以決定是否要重試,或是轉譯錯誤後傳回給最終使用者。$util.appendError(...) 是一項公用程式方法,可讓應用程式的設計人員將錯誤附加於文字內容,但不會干擾到範本的評估,進而完成前述的動作。在評估範本之後,AWS AppSync 會處理所有的內容錯誤,方式是將這些錯誤附加到 GraphQL 回應的錯誤區塊。

回應映射範本:

#if ($ctx.error) ## pass the unprocessed keys back to the caller via the `errorInfo` field $util.appendError($ctx.error.message, $ctx.error.type, null, $ctx.result.data.unprocessedKeys) #end $util.toJson($ctx.result.data)

我們會同時轉傳呼叫錯誤,以及 GraphQL 回應錯誤區塊中的 unprocessedKeys (未處理的索引鍵) 項目。getReadings 欄位也會傳回來自 locationReadings 資料表的部分資料,如下列的回應中所示。

GraphQL 回應:

{ "data": { "getReadings": [ null, { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00", "value": 85.5 } ] }, "errors": [ { "path": [ "getReadings" ], "data": null, "errorType": "DynamoDB:ProvisionedThroughputExceededException", "errorInfo": { "temperatureReadings": [ { "sensorId": "1", "timestamp": "2018-02-01T17:21:05.000+08:00" } ], "locationReadings": [] }, "locations": [ { "line": 58, "column": 3 } ], "message": "You exceeded your maximum allowed provisioned throughput for a table or for one or more global secondary indexes. (...)" } ] }