Amazon 數據 Firehose 中的動態分區 - Amazon 數據 Firehose

Amazon 數據 Firehose 以前被稱為 Amazon Kinesis Data Firehose

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Amazon 數據 Firehose 中的動態分區

動態磁碟分割可讓您使用資料中的金鑰 (例如,customer_idtransaction_id) 持續分割 Firehose 中的串流資料,然後將這些金鑰分組的資料交付到對應的 Amazon Simple Storage Service (Amazon S3) 前置詞。這可讓您使用各種服務 (例如 Amazon 雅典娜、Amazon EMR、亞馬遜 Amazon Redshift Spectrum 和亞馬遜) 對 Amazon S3 中的串流資料執行高效能且具成本效益的分析。 QuickSight此外,在需要額外處理的使用案例中, AWS Glue 可以在動態分區的串流資料交付到 Amazon S3 之後,執行更複雜的擷取、轉換和載入 (ETL) 任務。

分割資料可最大限度地減少掃描的資料量、最佳化性能,並降低 Amazon S3 上的分析查詢成本。它還可以增加對資料的精細存取。傳統上使用 Firehose 串流來擷取資料並將其載入 Amazon S3。若要分割 Amazon S3 分析的串流資料集,您需要在 Amazon S3 儲存貯體之間執行分割應用程式,然後才能將資料提供給分析,這可能會變得複雜或昂貴。

透過動態分割,Firehose 會使用動態或靜態定義的資料金鑰持續分組傳輸中的資料,並依金鑰將資料交付至個別的 Amazon S3 前置詞。這減少了幾 time-to-insight 分鐘或幾小時。它還可以降低成本並簡化架構。

分割索引鍵

透過動態分割,您可以根據分割索引鍵分割資料,從串流 S3 資料建立目標資料集。分割索引鍵可讓您根據特定值篩選串流資料。例如,如果您需要根據客戶 ID 和國家/地區篩選資料,可以將 customer_id 的資料欄位指定為一個分割索引鍵,而將 country 的資料欄位指定為另一個分割索引鍵。然後,您可以指定運算式 (使用支援的格式) 來定義要傳送動態分割資料記錄的 S3 儲存貯體字首。

以下是建立分割索引鍵的支援方法:

  • 聯解析-此方法使用 Firehose 內置的支持機制,一個 jq 解析器,用於從 JSON 格式的數據記錄中提取用於分區的密鑰。目前,我們僅支援jq 1.6版本。

  • AWS Lambda 函數-此方法使用指定的 AWS Lambda 函數來提取和返回分區所需的數據字段。

重要

啟用動態分割時,您必須至少設定下列一種方法,以分割您的資料。您可以設定其中一種方法來同時指定您的分割索引鍵,或同時指定兩個分割索引鍵。

使用內嵌剖析建立分割索引鍵

若要將內嵌剖析設定為串流資料的動態分割方法,您必須選擇要用作分割索引鍵的資料記錄參數,並為每個指定的分割索引鍵提供一個值。

讓我們來看看下面的示例數據記錄,看看如何使用內聯解析為它定義分區鍵:

{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, #epoch timestamp "region": "sample_region" }

例如,您可以選擇根據 customer_id 參數或 event_timestamp 參數對資料進行分割。這表示您希望在決定要交付記錄的 S3 字首時,使用每個記錄中的 customer_id 參數或 event_timestamp 參數值。您也可以選擇巢狀參數,就像使用運算式 .type.devicedevice 一樣。您的動態分割邏輯可以依賴於多個參數。

選取分割索引鍵的資料參數之後,您可以將每個參數映射至有效的 jq 運算式。下表顯示了參數到 jq 運算式的映射:

參數 jq 運算式
customer_id .customer_id
device

.type.device

year

.event_timestamp| strftime("%Y")

month

.event_timestamp| strftime("%m")

day

.event_timestamp| strftime("%d")

hour

.event_timestamp| strftime("%H")

在執行階段,Firehose 會根據每筆記錄中的資料,使用上方的右欄來評估參數。

使用 AWS Lambda 函數建立分割索引鍵

對於壓縮或加密的資料記錄,或 JSON 以外任何檔案格式的資料,您可以使用整合的 AWS Lambda 函數搭配您自己的自訂程式碼來解壓縮、解密或轉換記錄,以擷取和傳回分割所需的資料欄位。這是現有轉換 Lambda 函數的擴充功能,現在可透過 Firehose 使用。您可以轉換、剖析和傳回資料欄位,然後使用相同的 Lambda 函數來進行動態分割。

以下是 Python 中的 Firehose 流處理 Lambda 函數的示例,該函數會重播從輸入到輸出的每個讀取記錄,並從記錄中提取分區鍵。

from __future__ import print_function import base64 import json import datetime # Signature for all Lambda functions that user must implement def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn'] + ", Region: " + firehose_records_input['region'] + ", and InvocationId: " + firehose_records_input['invocationId']) # Create return value. firehose_records_output = {'records': []} # Create result object. # Go through records and process them for firehose_record_input in firehose_records_input['records']: # Get user payload payload = base64.b64decode(firehose_record_input['data']) json_value = json.loads(payload) print("Record that was received") print(json_value) print("\n") # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {} event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp']) partition_keys = {"customerId": json_value['customerId'], "year": event_timestamp.strftime('%Y'), "month": event_timestamp.strftime('%m'), "date": event_timestamp.strftime('%d'), "hour": event_timestamp.strftime('%H'), "minute": event_timestamp.strftime('%M') } # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {'recordId': firehose_record_input['recordId'], 'data': firehose_record_input['data'], 'result': 'Ok', 'metadata': { 'partitionKeys': partition_keys }} # Must set proper record ID # Add the record to the list of output records. firehose_records_output['records'].append(firehose_record_output) # At the end return processed records return firehose_records_output

以下是 Go 中的 Firehose 串流處理 Lambda 函式的範例,它會重播從輸入到輸出的每個讀取記錄,並從記錄擷取分割金鑰。

package main import ( "fmt" "encoding/json" "time" "strconv" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type DataFirehoseEventRecordData struct { CustomerId string `json:"customerId"` } func handleRequest(evnt events.DataFirehoseEvent) (events.DataFirehoseResponse, error) { fmt.Printf("InvocationID: %s\n", evnt.InvocationID) fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn) fmt.Printf("Region: %s\n", evnt.Region) var response events.DataFirehoseResponse for _, record := range evnt.Records { fmt.Printf("RecordID: %s\n", record.RecordID) fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp) var transformedRecord events.DataFirehoseResponseRecord transformedRecord.RecordID = record.RecordID transformedRecord.Result = events.DataFirehoseTransformedStateOk transformedRecord.Data = record.Data var metaData events.DataFirehoseResponseRecordMetadata var recordData DataFirehoseEventRecordData partitionKeys := make(map[string]string) currentTime := time.Now() json.Unmarshal(record.Data, &recordData) partitionKeys["customerId"] = recordData.CustomerId partitionKeys["year"] = strconv.Itoa(currentTime.Year()) partitionKeys["month"] = strconv.Itoa(int(currentTime.Month())) partitionKeys["date"] = strconv.Itoa(currentTime.Day()) partitionKeys["hour"] = strconv.Itoa(currentTime.Hour()) partitionKeys["minute"] = strconv.Itoa(currentTime.Minute()) metaData.PartitionKeys = partitionKeys transformedRecord.Metadata = metaData response.Records = append(response.Records, transformedRecord) } return response, nil } func main() { lambda.Start(handleRequest) }

用於動態分割的 Amazon S3 儲存貯體字首

當您建立使用 Amazon S3 做為目的地的 Firehose 串流時,您必須指定一個 Amazon S3 儲存貯體,其中 Firehose 要交付您的資料。Amazon S3 儲存貯體字首用於整理儲存在 Amazon S3 儲存貯體中的資料。Amazon S3 儲存貯體字首類似於目錄,可讓您在儲存貯體中對類似物件進行分組。

透過動態分割,您的分割資料會交付到指定的 Amazon S3 字首。如果您未啟用動態磁碟分割,則為 Firehose 串流指定 S3 儲存貯體前綴是可選的。但是,如果您選擇啟用動態分割,則必須指定 Firehose 向其提供分區資料的 S3 儲存貯體前綴。

在您啟用動態磁碟分割的每個 Firehose 串流中,S3 儲存貯體前綴值是由以該交付串流指定的分割金鑰為基礎的運算式所組成。再次使用上述資料記錄範例,您可以建立下列 S3 字首值,該值由以上定義的分割索引鍵為基礎的運算式組成:

"ExtendedS3DestinationConfiguration": { "BucketARN": "arn:aws:s3:::my-logs-prod", "Prefix": "customer_id=!{partitionKeyFromQuery:customer_id}/ device=!{partitionKeyFromQuery:device}/ year=!{partitionKeyFromQuery:year}/ month=!{partitionKeyFromQuery:month}/ day=!{partitionKeyFromQuery:day}/ hour=!{partitionKeyFromQuery:hour}/" }

Firehose 在運行時評估上述表達式。它會將符合相同評估後 S3 字首運算式的記錄分組到單一資料集中。Firehose 接著會將每個資料集傳送至評估後的 S3 前置詞。資料集傳遞至 S3 的頻率取決於 Firehose 串流緩衝區設定。因此,本範例中的記錄會交付至下列 S3 物件金鑰:

s3://my-logs-prod/customer_id=1234567890/device=mobile/year=2019/month=08/day=09/hour=20/my-delivery-stream-2019-08-09-23-55-09-a9fa96af-e4e4-409f-bac3-1f804714faaa

對於動態分割,您必須在 S3 儲存貯體字首中使用下列運算式格式:!{namespace:value},其中命名空間可以是 partitionKeyFromQuery 和/或 partitionKeyFromLambda。如果您使用內嵌剖析來建立來源資料的分割索引鍵,則必須指定由以下格式指定之運算式所組成的 S3 儲存貯體字首值:"partitionKeyFromQuery:keyID"。如果您使用 AWS Lambda 函數為來源資料建立分割索引鍵,則必須指定由以下格式指定之運算式所組成的 S3 儲存貯體字首值:"partitionKeyFromLambda:keyID"

注意

您也可以使用配置單元樣式格式指定 S3 存儲桶前綴值,例如客戶 _id=! {partitionKeyFrom查詢:客戶 ID}。

如需詳細資訊,請參閱建立 Amazon Firehose 串流和 Amazon S3 物件的自訂首碼中的「為您的目的地選擇 Amazon S3」。

彙整資料的動態分割

您可以將動態分割套用至彙整資料 (例如,將多個事件、日誌或彙整成單一 PutRecordPutRecordBatch API 呼叫的記錄),但必須先將此資料取消彙整。您可以通過啟用多記錄解彙總來分解數據-通過 Firehose 流中的記錄進行解析並將其分離的過程。多記錄取消彙整可以是 JSON 類型,這意味著記錄的分隔基於有效的 JSON 來執行。或者它可以是 Delimited 類型,這意味著根據指定的自定義分隔符執行記錄的分隔。此自訂分隔符號必須是 base-64 編碼字串。例如,如果您想要使用下列字串做為自訂分隔符號####,您必須以 base-64 編碼格式指定它,並將其轉換為。IyMjIw==

使用彙總資料時,當您啟用動態磁碟分割時,Firehose 會剖析記錄,並根據指定的多重記錄解彙總類型,在每個 API 呼叫中尋找有效的 JSON 物件或分隔記錄。

重要

如果您的資料已彙整,則只有在您的資料第一次取消彙整時,才能套用動態分割。

重要

當您在 Firehose 中使用資料轉換功能時,會在資料轉換之前套用解彙總。進入 Firehose 的資料將依下列順序處理:解彙總 → 透過 Lambda 進行資料轉換 → 分割金鑰。

在將資料傳送至 S3 時新增新的行分隔符號

您可以啟用「新行分隔符號」,在交付到 Amazon S3 的物件中的記錄之間新增行分隔符號。這對於在 Amazon S3 中剖析物件很有幫助。當動態磁碟分割套用至彙總資料時,這也特別有用,因為多重記錄解彙總 (必須先套用至彙總資料,才能動態分割資料) 會在剖析程序中從記錄中移除新行。

如何啟用動態分割

您可以透過 Amazon 資料防火管管理主控台、CLI 或 API 為您的 Firehose 串流設定動態分割。

重要

只有在建立新 Firehose 串流時,才能啟用動態磁碟分割。您無法為尚未啟用動態磁碟分割的現有 Firehose 串流啟用動態磁碟分割。

如需如何在建立新 Firehose 串流時透過 Firehose 管理主控台啟用和設定動態磁碟分割的詳細步驟,請參閱建立 Amazon Fire hose 串流。當您完成為 Firehose 串流指定目的地的任務時,請務必遵循「為您的目的地選擇 Amazon S3」一節中的步驟,因為目前只有使用 Amazon S3 做為目的地的 Firehose 串流才支援動態分割。

啟用作用中 Firehose 串流上的動態分割後,您可以透過新增或移除或更新現有的分割金鑰和 S3 前置碼運算式來更新組態。一旦更新,Firehose 就會開始使用新的金鑰和新的 S3 前置詞運算式。

重要

一旦您在 Firehose 串流上啟用動態磁碟分割,就無法在此 Firehose 串流中停用該磁碟分割。

動態分割錯誤處理

如果 Amazon Data Firehose 無法剖析 Firehose 串流中的資料記錄,或無法擷取指定的分區金鑰,或評估 S3 前置字元值中包含的運算式,則這些資料記錄會傳送至您在其中啟用動態分割的 Firehose 串流時必須指定的 S3 錯誤儲存貯體前綴。S3 錯誤儲存貯體前綴包含 Firehose 無法傳遞至指定 S3 目的地的所有記錄。這些記錄根據錯誤類型進行組織。除了記錄之外,傳送的物件還包括錯誤的相關資訊,以協助了解並解決錯誤。

如果您要為此 Firehose 串流啟用動態磁碟分割,則必須為 Firehose 串流指定 S3 錯誤儲存貯體前置詞。如果您不想為 Firehose 串流啟用動態分割,則可選擇指定 S3 錯誤儲存貯體前置詞。

資料緩衝和動態分割

Amazon Data Firehose 會將傳入的串流資料緩衝為特定大小並保持一段特定時間,然後再將其傳送到指定的目的地。您可以在建立新 Firehose 串流時設定緩衝區大小和緩衝區間隔,或更新現有 Firehose 串流的緩衝區大小和緩衝區間隔。緩衝區大小以 MB 為測量單位,緩衝間隔以秒為測量單位。

啟用動態磁碟分割後,Firehose 會根據設定的緩衝提示 (大小和時間),在內部緩衝屬於指定分割區的記錄,然後再將這些記錄傳送到 Amazon S3 儲存貯體。為了提供最大尺寸的對象,Firehose 在內部使用多級緩衝。因此,一批記錄的 end-to-end 延遲可能是配置緩衝提示時間的 1.5 倍。這會影響 Firehose 串流的資料新鮮度。

作用中分割區計數是交付緩衝區內的作用中分割區總數。例如,如果動態分割查詢每秒建構 3 個分割區,而且您的緩衝區提示組態會每 60 秒觸發交付,則平均而言,您就會有 180 個作用中分割區。如果 Firehose 無法將分割區中的資料傳送到目的地,則此分割區會在傳遞緩衝區中計為作用中,直到可以傳送為止。

根據記錄資料欄位和 S3 字首運算式將 S3 字首評估為新值時,會建立新的分割。會為每個作用中的分割建立新的緩衝區。具有相同評估 S3 字首的每個後續記錄都會交付到該緩衝區。一旦緩衝區達到緩衝區大小限制或緩衝時間間隔,Firehose 就會建立包含緩衝區資料的物件,並將其傳送到指定的 Amazon S3 前綴。一旦物件交付,該分割區和分割本身的緩衝區就會被刪除,並從作用中的分割計數中移除。一旦分別滿足每個分區的緩衝區大小或間隔,Firehose 將每個緩衝區數據作為單個對象提供。一旦作用中分割區數量達到每個交付串流 500 個上限,Firehose 串流中的其餘記錄就會傳送到指定的 S3 錯誤儲存貯體前綴。