Amazon データFirehose 動的パーティショニング - Amazon Data Firehose

Amazon Data Firehose は、以前は Amazon Kinesis Data Firehose と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon データFirehose 動的パーティショニング

動的パーティショニングでは、データ内のキー (たとえば、customer_idまたはtransaction_id) を使用して Firehose 内のストリーミングデータを継続的にパーティション化し、これらのキーでグループ化されたデータを対応する Amazon Simple Storage Service (Amazon S3) プレフィックスに配信できます。これにより、Amazon Athena、Amazon EMR、Amazon Redshift Spectrum、Amazon などのさまざまなサービスを使用して、Amazon S3 のストリーミングデータに対して高性能でコスト効率の高い分析を簡単に実行できます。 QuickSightさらに、 AWS Glue は、動的に分割されたストリーミングデータが Amazon S3 に配信された後、追加の処理が必要なユースケースで、より高度な抽出、変換、読み込み(ETL)ジョブを実行できます。

データをパーティショニングすることで、スキャンされるデータ量が最小限に抑えられ、パフォーマンスが最適化され、Amazon S3 での分析クエリのコストが削減されます。また、データへのきめ細かいアクセスも向上します。Firehose ストリームは従来、データをキャプチャして Amazon S3 にロードするために使用されていました。Amazon S3 ベースの分析用にストリーミングデータセットをパーティショニングするには、データを分析に使用できるようにする前に、Amazon S3 バケット間でパーティショニングアプリケーションを実行する必要がありますが、これは複雑になるか、費用がかかる場合があります。

動的パーティショニングでは、Firehose は動的または静的に定義されたデータキーを使用して転送中のデータを継続的にグループ化し、データをキーごとに個々の Amazon S3 プレフィックスに配信します。これにより、数分または数時間短縮されます time-to-insight 。また、コストを削減し、アーキテクチャを簡素化します。

パーティショニングキー

動的パーティショニングでは、パーティショニングキーに基づいてデータをパーティショニングすることで、ストリーミング S3 データからターゲットデータセットを作成します。パーティショニングキーを使用すると、特定の値に基づいてストリーミングデータをフィルタリングできます。たとえば、顧客 ID と国に基づいてデータをフィルタリングする必要がある場合は、1 つのパーティショニングキーとして customer_id のデータフィールドを、また別のパーティショニングキーとして country のデータフィールドを指定できます。次に、(サポートされている形式を使用して) 式を指定し、動的にパーティショニングされたデータレコードの配信先となる S3 バケットプレフィックスを定義します。

パーティショニングキーの作成でサポートされているメソッドは次のとおりです。

  • インライン解析-このメソッドは、Firehose の組み込みサポートメカニズムである jq パーサーを使用して、JSON 形式のデータレコードからパーティショニング用のキーを抽出します。現在、サポートされているのはバージョンのみです。jq 1.6

  • AWS Lambda 関数-このメソッドは、指定された AWS Lambda 関数を使用して、パーティショニングに必要なデータフィールドを抽出して返します。

重要

動的パーティショニングを有効にする場合、データをパーティショニングするには、これらのメソッドの少なくとも 1 つを設定する必要があります。これらのメソッドのいずれかを設定して、パーティショニングキーを指定することも、両方を同時に指定することもできます。

インライン解析によるパーティショニングキーの作成

ストリーミングデータの動的パーティショニングメソッドとしてインライン解析を設定するには、パーティショニングキーとして使用するデータレコードパラメータを選択し、それぞれの指定したパーティショニングキーの値を提供する必要があります。

次のサンプルデータレコードを見て、インライン解析でそのデータレコードのパーティショニングキーを定義する方法を見てみましょう。

{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, #epoch timestamp "region": "sample_region" }

たとえば、customer_id パラメータまたは event_timestamp パラメータに基づいてデータをパーティショニングすることを選択できます。これは、レコードが配信される S3 プレフィックスの決定に使用される各レコードの customer_id パラメータまたは event_timestamp パラメータの値が必要であることを意味します。また、式 .type.device を用いた device のように、ネストされたパラメータを選択することもできます。動的パーティショニングロジックは、複数のパラメータに依存する可能性があります。

パーティショニングキーのデータパラメータを選択した後、各パラメータを有効な jq 式にマップします。次のテーブルに、jq 式へのパラメータのマッピングを示します。

パラメータ ip 式
customer_id .customer_id
device

.type.device

year

.event_timestamp| strftime("%Y")

month

.event_timestamp| strftime("%m")

day

.event_timestamp| strftime("%d")

hour

.event_timestamp| strftime("%H")

実行時に、Firehose は上の右側の列を使用して、各レコードのデータに基づいてパラメータを評価します。

AWS Lambda 関数を用いてパーティショニングキーを作成する

圧縮または暗号化されたデータレコード、または JSON 以外のファイル形式のデータの場合は、統合された AWS Lambda 関数と独自のカスタムコードを使用してレコードを解凍、復号化、または変換し、パーティショニングに必要なデータフィールドを抽出して返すことができます。これは、Firehose で現在利用できる既存のトランスフォーム Lambda 関数を拡張したものです。同じ Lambda 関数を使用して、動的パーティショニングに使用できるデータフィールドを変換、解析、および返すことができます。

以下は、Python の Firehose ストリーム処理 Lambda 関数の例です。この関数は、すべての読み取りレコードを入力から出力まで再生し、レコードからパーティションキーを抽出します。

from __future__ import print_function import base64 import json import datetime # Signature for all Lambda functions that user must implement def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn'] + ", Region: " + firehose_records_input['region'] + ", and InvocationId: " + firehose_records_input['invocationId']) # Create return value. firehose_records_output = {'records': []} # Create result object. # Go through records and process them for firehose_record_input in firehose_records_input['records']: # Get user payload payload = base64.b64decode(firehose_record_input['data']) json_value = json.loads(payload) print("Record that was received") print(json_value) print("\n") # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {} event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp']) partition_keys = {"customerId": json_value['customerId'], "year": event_timestamp.strftime('%Y'), "month": event_timestamp.strftime('%m'), "date": event_timestamp.strftime('%d'), "hour": event_timestamp.strftime('%H'), "minute": event_timestamp.strftime('%M') } # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {'recordId': firehose_record_input['recordId'], 'data': firehose_record_input['data'], 'result': 'Ok', 'metadata': { 'partitionKeys': partition_keys }} # Must set proper record ID # Add the record to the list of output records. firehose_records_output['records'].append(firehose_record_output) # At the end return processed records return firehose_records_output

以下は、Go の Firehose ストリーム処理 Lambda 関数の例で、すべての読み取りレコードを入力から出力まで再生し、レコードからパーティションキーを抽出します。

package main import ( "fmt" "encoding/json" "time" "strconv" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type DataFirehoseEventRecordData struct { CustomerId string `json:"customerId"` } func handleRequest(evnt events.DataFirehoseEvent) (events.DataFirehoseResponse, error) { fmt.Printf("InvocationID: %s\n", evnt.InvocationID) fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn) fmt.Printf("Region: %s\n", evnt.Region) var response events.DataFirehoseResponse for _, record := range evnt.Records { fmt.Printf("RecordID: %s\n", record.RecordID) fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp) var transformedRecord events.DataFirehoseResponseRecord transformedRecord.RecordID = record.RecordID transformedRecord.Result = events.DataFirehoseTransformedStateOk transformedRecord.Data = record.Data var metaData events.DataFirehoseResponseRecordMetadata var recordData DataFirehoseEventRecordData partitionKeys := make(map[string]string) currentTime := time.Now() json.Unmarshal(record.Data, &recordData) partitionKeys["customerId"] = recordData.CustomerId partitionKeys["year"] = strconv.Itoa(currentTime.Year()) partitionKeys["month"] = strconv.Itoa(int(currentTime.Month())) partitionKeys["date"] = strconv.Itoa(currentTime.Day()) partitionKeys["hour"] = strconv.Itoa(currentTime.Hour()) partitionKeys["minute"] = strconv.Itoa(currentTime.Minute()) metaData.PartitionKeys = partitionKeys transformedRecord.Metadata = metaData response.Records = append(response.Records, transformedRecord) } return response, nil } func main() { lambda.Start(handleRequest) }

動的パーティショニングの Amazon S3 バケットプレフィックス

Amazon S3 をデスティネーションとして使用する Firehose ストリームを作成するときは、Firehose がデータを配信する Amazon S3 バケットを指定する必要があります。Amazon S3 バケットプレフィックスを使用して、S3 バケットに保存するデータを整理できます。Amazon S3 バケットプレフィックスは、類似オブジェクトをグループ化できるディレクトリと同様のものです。

動的パーティショニングでは、パーティショニングされたデータは、指定された Amazon S3 プレフィックスに配信されます。動的パーティショニングを有効にしない場合、Firehose ストリームの S3 バケットプレフィックスの指定はオプションです。ただし、動的パーティショニングを有効にする場合は、Firehose がパーティションデータを配信する S3 バケットプレフィックスを指定する必要があります。

動的パーティショニングを有効にするすべての Firehose ストリームでは、S3 バケットプレフィックス値は、その配信ストリームに指定されたパーティショニングキーに基づく式で構成されます。上記のデータレコードの例を再度使用して、上記で定義したパーティショニングキーに基づく式で構成される次の S3 プレフィックス値を構築できます。

"ExtendedS3DestinationConfiguration": { "BucketARN": "arn:aws:s3:::my-logs-prod", "Prefix": "customer_id=!{partitionKeyFromQuery:customer_id}/ device=!{partitionKeyFromQuery:device}/ year=!{partitionKeyFromQuery:year}/ month=!{partitionKeyFromQuery:month}/ day=!{partitionKeyFromQuery:day}/ hour=!{partitionKeyFromQuery:hour}/" }

Firehose は実行時に上記の式を評価します。同じ評価された S3 プレフィックス式に一致するレコードを 1 つのデータセットにグループ化します。その後、Firehose は各データセットを評価済みの S3 プレフィックスに配信します。S3 へのデータセットの配信頻度は、Firehose ストリームバッファ設定によって決まります。その結果、この例のレコードは次の S3 オブジェクトキーに配信されます。

s3://my-logs-prod/customer_id=1234567890/device=mobile/year=2019/month=08/day=09/hour=20/my-delivery-stream-2019-08-09-23-55-09-a9fa96af-e4e4-409f-bac3-1f804714faaa

動的パーティショニングでは、S3 バケットプレフィックスで次の式形式を使用する必要があります: !{namespace:value}。ここで、名前空間は partitionKeyFromQuery または partitionKeyFromLambda、またはその両方です。インライン解析を使用してソースデータのパーティショニングキーを作成している場合は、次の形式で指定された式で構成される S3 バケットプレフィクス値を指定する必要があります: "partitionKeyFromQuery:keyID"。 AWS Lambda 関数を使用してソースデータのパーティショニングキーを作成している場合は、次の形式で指定された式で構成される S3 バケットプレフィックス値を指定する必要があります。"partitionKeyFromLambda:keyID"

注記

S3 バケットプレフィックス値は、customer_id=! のようにハイブスタイルの形式で指定することもできます。 {クエリ:顧客_ID}。partitionKeyFrom

詳細については、「Amazon Firehose ストリームと Amazon S3 オブジェクトのカスタムプレフィックスの作成」の「送信先として Amazon S3 を選択する」を参照してください。

集約データの動的パーティショニング

動的パーティショニングを集約データ (たとえば、複数のイベント、ログ、または単一の PutRecord および PutRecordBatch API コールに集約されたレコードなど) に適用できますが、このデータはまず集約解除する必要があります。マルチレコードのデアグリゲーション (Firehose ストリームのレコードを解析して分離するプロセス) を有効にすることで、データを非集計できます。

複数レコードの集約解除には、連続する JSON JSON オブジェクトに基づいてレコードを分離するタイプのものがあります。デアグリゲーションにはDelimited、指定したカスタム区切り文字に基づいてレコードの分離を行うタイプのものもあります。このカスタム区切り文字は base-64 でエンコードされた文字列である必要があります。たとえば、次の文字列をカスタム区切り文字として使用する場合は####、Base-64 エンコード形式で指定する必要があります。これにより、に変換されます。IyMjIw==

注記

JSON レコードを非集計するときは、入力がまだサポートされている JSON 形式で表示されていることを確認してください。JSON オブジェクトは、区切り文字なしで 1 行にするか、改行区切り (JSONL) のみにする必要があります。JSON オブジェクトの配列は有効な入力ではありません。

正しい入力の例は次のとおりです。{"a":1}{"a":2} and {"a":1}\n{"a":2}

以下は誤った入力の例です。[{"a":1}, {"a":2}]

集約データでは、動的パーティショニングを有効にすると、Firehose はレコードを解析し、指定された複数レコードのデアグリゲーションタイプに基づいて、各 API 呼び出し内の有効な JSON オブジェクトまたは区切り付きレコードを検索します。

重要

データが集約されている場合、動的パーティショニングは、データが最初に集約解除された場合にのみ適用できます。

重要

Firehose のデータ変換機能を使用すると、データ変換の前に集約解除が適用されます。Firehose に入力されるデータは、「デアグリゲーション」→「Lambda によるデータ変換」→「パーティショニングキー」の順に処理されます。

S3 にデータを配信するときに新しい行区切り文字を追加する

改行区切り文字を有効にすると、Amazon S3 に配信されるオブジェクトのレコード間に新しい行区切り文字を追加できます。これは、Amazon S3 のオブジェクトの解析に役立ちます。これは、集約データに動的パーティショニングを適用する場合にも特に便利です。複数レコードの非集約化 (集約されたデータを動的に分割する前に適用する必要がある) では、解析プロセスの一環としてレコードから新しい行が削除されるためです。

動的パーティショニングを有効にする方法

Firehose ストリームの動的パーティショニングは、Amazon データ Firehose マネジメントコンソール、CLI、または API を使用して設定できます。

重要

動的パーティショニングは、新しい Firehose ストリームを作成する場合にのみ有効にできます。動的パーティショニングがまだ有効になっていない既存の Firehose ストリームでは、動的パーティショニングを有効にすることはできません。

新しい Firehose ストリームを作成するときに Firehose マネジメントコンソールから動的パーティショニングを有効にして設定する方法の詳細な手順については、「Amazon Firehose ストリームの作成」を参照してください。現在、動的パーティショニングは Amazon S3 を宛先として使用する Firehose ストリームでのみサポートされているため、Firehose ストリームの宛先を指定する作業に取り掛かったら、必ず「送信先として Amazon S3 を選択」セクションの手順に従ってください。

アクティブな Firehose ストリームの動的パーティショニングが有効になったら、新しいパーティショニングキーを追加するか、既存のパーティショニングキーと S3 プレフィックス式を削除または更新することで、設定を更新できます。更新すると、Firehose は新しいキーと新しい S3 プレフィックス式の使用を開始します。

重要

Firehose ストリームで動的パーティショニングを有効にすると、この Firehose ストリームで無効にすることはできません。

動的パーティショニングのエラー処理

Amazon Data Firehose が Firehose ストリーム内のデータレコードを解析できない、指定されたパーティショニングキーを抽出できない、または S3 プレフィックス値に含まれる式を評価できない場合、これらのデータレコードは S3 エラーバケットプレフィックスに配信されます。このプレフィックスは、動的パーティショニングを有効にする Firehose ストリームの作成時に指定する必要があります。S3 エラーバケットプレフィックスには、Firehose が指定された S3 宛先に配信できないすべてのレコードが含まれます。これらのレコードは、エラータイプに基づいて整理されます。レコードとともに、配信されたオブジェクトには、エラーの理解と解決に役立つエラーに関する情報も含まれます。

この Firehose ストリームの動的パーティショニングを有効にする場合は、Firehose ストリームの S3 エラーバケットプレフィックスを指定する必要があります。Firehose ストリームの動的パーティショニングを有効にしたくない場合は、S3 エラーバケットプレフィックスの指定はオプションです。

データバッファリングと動的パーティショニング

Amazon Data Firehose は、受信ストリーミングデータを特定のサイズで一定期間バッファしてから、指定された宛先に配信します。新しい Firehose ストリームを作成する際にバッファサイズとバッファ間隔を設定するか、既存の Firehose ストリームのバッファサイズとバッファ間隔を更新できます。バッファサイズは MB 単位で、バッファ間隔は秒単位です。

動的パーティショニングが有効になっている場合、Firehose は、設定されたバッファリングヒント(サイズと時間)に基づいて特定のパーティションに属するレコードを内部的にバッファしてから、これらのレコードを Amazon S3 バケットに配信します。最大サイズのオブジェクトを配信するために、Firehose は内部で多段階バッファリングを使用しています。そのため、 end-to-end レコードのバッチの遅延は、設定したバッファリングヒント時間の 1.5 倍になる可能性があります。これは Firehose ストリームのデータの鮮度に影響します。

アクティブパーティション数は、配信バッファ内のアクティブパーティションの総数です。たとえば、動的パーティショニングクエリが 1 秒あたり 3 つのパーティションを構築し、60 秒ごとに配信をトリガーするバッファのヒント設定がある場合、平均して 180 個のアクティブパーティションが作成されます。Firehose がパーティション内のデータを宛先に配信できない場合、このパーティションは配信可能になるまで配信バッファ内でアクティブとしてカウントされます。

レコードデータフィールドと S3 プレフィックス式に基づいて S3 プレフィクスが新しい値に評価されると、新しいパーティションが作成されます。アクティブパーティションごとに新しいバッファが作成されます。同じ評価された S3 プレフィックスを持つ後続のすべてのレコードが、そのバッファに配信されます。バッファがバッファサイズ制限またはバッファ時間間隔を満たすと、Firehose はバッファデータを含むオブジェクトを作成し、指定された Amazon S3 プレフィックスに配信します。オブジェクトが配信されると、そのパーティションのバッファとパーティション自体が削除され、アクティブなパーティション数から除外されます。Firehose は、各パーティションのバッファサイズまたは間隔が個別に満たされると、各バッファデータを単一のオブジェクトとして配信します。アクティブなパーティションの数が配信ストリームあたりの上限である500に達すると、Firehose ストリームの残りのレコードは、指定されたS3エラーバケットプレフィックスに配信されます。