Amazon Data Firehose での動的パーティショニング - Amazon Data Firehose

Amazon Data Firehose は、以前は Amazon Kinesis Data Firehose と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon Data Firehose での動的パーティショニング

動的パーティショニングを使用すると、データ内のキー ( customer_idや などtransaction_id) を使用して Firehose でストリーミングデータを継続的にパーティション化し、これらのキーでグループ化されたデータを対応する Amazon Simple Storage Service (Amazon S3) プレフィックスに配信できます。これにより、Amazon Athena、Amazon S3 のストリーミングデータに対して高性能でコスト効率の高い分析を簡単に実行できます QuickSight。さらに、 AWS Glue は、動的にパーティション分割されたストリーミングデータが Amazon S3 に配信された後に、追加の処理が必要なユースケースで、より高度な抽出、変換、ロード (ETL) ジョブを実行できます。

データをパーティショニングすることで、スキャンされるデータ量が最小限に抑えられ、パフォーマンスが最適化され、Amazon S3 での分析クエリのコストが削減されます。また、データへのきめ細かいアクセスも向上します。Firehose ストリームは、従来、データをキャプチャして Amazon S3 にロードするために使用されていました。Amazon S3 ベースの分析用にストリーミングデータセットをパーティショニングするには、データを分析に使用できるようにする前に、Amazon S3 バケット間でパーティショニングアプリケーションを実行する必要がありますが、これは複雑になるか、費用がかかる場合があります。

動的パーティショニングでは、Firehose は動的または静的に定義されたデータキーを使用して転送中のデータを継続的にグループ化し、キーごとに個々の Amazon S3 プレフィックスにデータを配信します。これにより、数分または数時間短縮 time-to-insight されます。また、コストを削減し、アーキテクチャを簡素化します。

パーティショニングキー

動的パーティショニングでは、パーティショニングキーに基づいてデータをパーティショニングすることで、ストリーミング S3 データからターゲットデータセットを作成します。パーティショニングキーを使用すると、特定の値に基づいてストリーミングデータをフィルタリングできます。たとえば、顧客 ID と国に基づいてデータをフィルタリングする必要がある場合は、1 つのパーティショニングキーとして customer_id のデータフィールドを、また別のパーティショニングキーとして country のデータフィールドを指定できます。次に、(サポートされている形式を使用して) 式を指定し、動的にパーティショニングされたデータレコードの配信先となる S3 バケットプレフィックスを定義します。

パーティショニングキーの作成でサポートされているメソッドは次のとおりです。

  • インライン解析 - このメソッドは、Firehose 組み込みサポートメカニズム jq パーサー を使用して、JSON 形式のデータレコードからパーティショニングするためのキーを抽出します。現在、 jq 1.6バージョンのみがサポートされています。

  • AWS Lambda 関数 - このメソッドは、指定された AWS Lambda 関数を使用して、パーティショニングに必要なデータフィールドを抽出して返します。

重要

動的パーティショニングを有効にする場合、データをパーティショニングするには、これらのメソッドの少なくとも 1 つを設定する必要があります。これらのメソッドのいずれかを設定して、パーティショニングキーを指定することも、両方を同時に指定することもできます。

インライン解析によるパーティショニングキーの作成

ストリーミングデータの動的パーティショニングメソッドとしてインライン解析を設定するには、パーティショニングキーとして使用するデータレコードパラメータを選択し、それぞれの指定したパーティショニングキーの値を提供する必要があります。

次のサンプルデータレコードは、インライン解析を使用してパーティションキーを定義する方法を示しています。データは Base64 形式でエンコードする必要があることに注意してください。CLI の例 を参照することもできます。

{ "type": { "device": "mobile", "event": "user_clicked_submit_button" }, "customer_id": "1234567890", "event_timestamp": 1565382027, #epoch timestamp "region": "sample_region" }

たとえば、customer_id パラメータまたは event_timestamp パラメータに基づいてデータをパーティショニングすることを選択できます。これは、レコードが配信される S3 プレフィックスの決定に使用される各レコードの customer_id パラメータまたは event_timestamp パラメータの値が必要であることを意味します。また、式 .type.device を用いた device のように、ネストされたパラメータを選択することもできます。動的パーティショニングロジックは、複数のパラメータに依存する可能性があります。

パーティショニングキーのデータパラメータを選択した後、各パラメータを有効な jq 式にマップします。次のテーブルに、jq 式へのパラメータのマッピングを示します。

パラメータ ip 式
customer_id .customer_id
device

.type.device

year

.event_timestamp| strftime("%Y")

month

.event_timestamp| strftime("%m")

day

.event_timestamp| strftime("%d")

hour

.event_timestamp| strftime("%H")

実行時に、Firehose は上記の右側の列を使用して、各レコードのデータに基づいてパラメータを評価します。

AWS Lambda 関数を用いてパーティショニングキーを作成する

圧縮または暗号化されたデータレコード、または JSON 以外のファイル形式のデータの場合、統合された AWS Lambda 関数と独自のカスタムコードを使用してレコードを解凍、復号、または変換し、パーティショニングに必要なデータフィールドを抽出して返すことができます。これは、Firehose で現在利用可能な既存の変換 Lambda 関数の拡張です。同じ Lambda 関数を使用して、動的パーティショニングに使用できるデータフィールドを変換、解析、および返すことができます。

以下は、入力から出力へのすべての読み取りレコードを再生し、レコードからパーティショニングキーを抽出する、Python で Lambda 関数を処理する Firehose ストリームの例です。

from __future__ import print_function import base64 import json import datetime # Signature for all Lambda functions that user must implement def lambda_handler(firehose_records_input, context): print("Received records for processing from DeliveryStream: " + firehose_records_input['deliveryStreamArn'] + ", Region: " + firehose_records_input['region'] + ", and InvocationId: " + firehose_records_input['invocationId']) # Create return value. firehose_records_output = {'records': []} # Create result object. # Go through records and process them for firehose_record_input in firehose_records_input['records']: # Get user payload payload = base64.b64decode(firehose_record_input['data']) json_value = json.loads(payload) print("Record that was received") print(json_value) print("\n") # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {} event_timestamp = datetime.datetime.fromtimestamp(json_value['eventTimestamp']) partition_keys = {"customerId": json_value['customerId'], "year": event_timestamp.strftime('%Y'), "month": event_timestamp.strftime('%m'), "date": event_timestamp.strftime('%d'), "hour": event_timestamp.strftime('%H'), "minute": event_timestamp.strftime('%M') } # Create output Firehose record and add modified payload and record ID to it. firehose_record_output = {'recordId': firehose_record_input['recordId'], 'data': firehose_record_input['data'], 'result': 'Ok', 'metadata': { 'partitionKeys': partition_keys }} # Must set proper record ID # Add the record to the list of output records. firehose_records_output['records'].append(firehose_record_output) # At the end return processed records return firehose_records_output

以下は、入力から出力へのすべての読み取りレコードを再生し、レコードからパーティショニングキーを抽出する、Go で Lambda 関数を処理する Firehose ストリームの例です。

package main import ( "fmt" "encoding/json" "time" "strconv" "github.com/aws/aws-lambda-go/events" "github.com/aws/aws-lambda-go/lambda" ) type DataFirehoseEventRecordData struct { CustomerId string `json:"customerId"` } func handleRequest(evnt events.DataFirehoseEvent) (events.DataFirehoseResponse, error) { fmt.Printf("InvocationID: %s\n", evnt.InvocationID) fmt.Printf("DeliveryStreamArn: %s\n", evnt.DeliveryStreamArn) fmt.Printf("Region: %s\n", evnt.Region) var response events.DataFirehoseResponse for _, record := range evnt.Records { fmt.Printf("RecordID: %s\n", record.RecordID) fmt.Printf("ApproximateArrivalTimestamp: %s\n", record.ApproximateArrivalTimestamp) var transformedRecord events.DataFirehoseResponseRecord transformedRecord.RecordID = record.RecordID transformedRecord.Result = events.DataFirehoseTransformedStateOk transformedRecord.Data = record.Data var metaData events.DataFirehoseResponseRecordMetadata var recordData DataFirehoseEventRecordData partitionKeys := make(map[string]string) currentTime := time.Now() json.Unmarshal(record.Data, &recordData) partitionKeys["customerId"] = recordData.CustomerId partitionKeys["year"] = strconv.Itoa(currentTime.Year()) partitionKeys["month"] = strconv.Itoa(int(currentTime.Month())) partitionKeys["date"] = strconv.Itoa(currentTime.Day()) partitionKeys["hour"] = strconv.Itoa(currentTime.Hour()) partitionKeys["minute"] = strconv.Itoa(currentTime.Minute()) metaData.PartitionKeys = partitionKeys transformedRecord.Metadata = metaData response.Records = append(response.Records, transformedRecord) } return response, nil } func main() { lambda.Start(handleRequest) }

動的パーティショニングの Amazon S3 バケットプレフィックス

Amazon S3 を送信先として使用する Firehose ストリームを作成するときは、Firehose がデータを配信する Amazon S3 バケットを指定する必要があります。Amazon S3 バケットプレフィックスを使用して、S3 バケットに保存するデータを整理できます。Amazon S3 バケットプレフィックスは、類似オブジェクトをグループ化できるディレクトリと同様のものです。

動的パーティショニングでは、パーティショニングされたデータは、指定された Amazon S3 プレフィックスに配信されます。動的パーティショニングを有効にしない場合、Firehose ストリームの S3 バケットプレフィックスの指定はオプションです。ただし、動的パーティショニングを有効にする場合は、Firehose がパーティション化されたデータを配信する S3 バケットプレフィックスを指定する必要があります。

動的パーティショニングを有効にするすべての Firehose ストリームでは、S3 バケットプレフィックス値は、その Firehose ストリームの指定されたパーティショニングキーに基づく式で構成されます。上記のデータレコードの例を再度使用して、上記で定義したパーティショニングキーに基づく式で構成される次の S3 プレフィックス値を構築できます。

"ExtendedS3DestinationConfiguration": { "BucketARN": "arn:aws:s3:::my-logs-prod", "Prefix": "customer_id=!{partitionKeyFromQuery:customer_id}/ device=!{partitionKeyFromQuery:device}/ year=!{partitionKeyFromQuery:year}/ month=!{partitionKeyFromQuery:month}/ day=!{partitionKeyFromQuery:day}/ hour=!{partitionKeyFromQuery:hour}/" }

Firehose は、実行時に上記の式を評価します。同じ評価された S3 プレフィックス式に一致するレコードを 1 つのデータセットにグループ化します。次に、Firehose は各データセットを評価済みの S3 プレフィックスに配信します。S3 へのデータセット配信の頻度は、Firehose ストリームバッファ設定によって決まります。その結果、この例のレコードは次の S3 オブジェクトキーに配信されます。

s3://my-logs-prod/customer_id=1234567890/device=mobile/year=2019/month=08/day=09/hour=20/my-delivery-stream-2019-08-09-23-55-09-a9fa96af-e4e4-409f-bac3-1f804714faaa

動的パーティショニングでは、S3 バケットプレフィックスで次の式形式を使用する必要があります: !{namespace:value}。ここで、名前空間は partitionKeyFromQuery または partitionKeyFromLambda、またはその両方です。インライン解析を使用してソースデータのパーティショニングキーを作成している場合は、次の形式で指定された式で構成される S3 バケットプレフィクス値を指定する必要があります: "partitionKeyFromQuery:keyID"。 AWS Lambda 関数を使用してソースデータのパーティショニングキーを作成している場合は、次の形式で指定された式で構成される S3 バケットプレフィックス値を指定する必要があります。"partitionKeyFromLambda:keyID"

注記

また、hive スタイル形式を使用して S3 バケットプレフィックス値を指定することもできます。例: customer_id=!{partitionKeyFromクエリ:customer_id}。

詳細については、Amazon S3 Firehose ストリームの作成」の「送信先に Amazon S3 を選択する」とAmazon S3 オブジェクトのカスタムプレフィックス」を参照してください。

集約データの動的パーティショニング

動的パーティショニングを集約データ (たとえば、複数のイベント、ログ、または単一の PutRecord および PutRecordBatch API コールに集約されたレコードなど) に適用できますが、このデータはまず集約解除する必要があります。マルチレコードの集約解除を有効にすることで、データを集約解除できます。これは、Firehose ストリーム内のレコードを解析して分離するプロセスです。

マルチレコードの集約解除は、JSONタイプ のいずれかになります。つまり、レコードの分離は連続する JSON オブジェクトに基づいています。デアグリゲーションは タイプでもかまいません。つまりDelimited、レコードの分離は、指定されたカスタム区切り文字に基づいて実行されます。このカスタム区切り文字は base-64 でエンコードされた文字列である必要があります。例えば、次の文字列をカスタム区切り文字 として使用する場合は####、base-64 でエンコードされた形式で指定し、 に変換する必要がありますIyMjIw==

注記

JSON レコードを集約解除するときは、入力がサポートされている JSON 形式でまだ表示されていることを確認してください。JSON オブジェクトは、区切り文字や改行区切り (JSONL) のない 1 行にする必要があります。JSON オブジェクトの配列は有効な入力ではありません。

正しい入力の例を次に示します。 {"a":1}{"a":2} and {"a":1}\n{"a":2}

正しくない入力の例を次に示します。 [{"a":1}, {"a":2}]

集約データでは、動的パーティショニングを有効にすると、Firehose はレコードを解析し、指定されたマルチレコードの集約解除タイプに基づいて、有効な JSON オブジェクトまたは各 API コール内の区切りレコードを検索します。

重要

データが集約されている場合、動的パーティショニングは、データが最初に集約解除された場合にのみ適用できます。

重要

Firehose でデータ変換機能を使用すると、データ変換の前に集約解除が適用されます。Firehose に送信されるデータは、集約解除 → Lambda によるデータ変換 → パーティショニングキーの順に処理されます。

S3 にデータを配信するときに新しい行区切り文字を追加する

改行区切り文字を有効にして、Amazon Amazon S3 に配信されるオブジェクト内のレコード間に新しい改行区切り文字を追加できます。これは、Amazon S3 のオブジェクトの解析に役立ちます。これは、マルチレコードの集約解除 (動的にパーティション化する前に集約データに適用する必要がある) が解析プロセスの一環としてレコードから新しい行を削除するため、動的パーティショニングが集約データに適用される場合にも特に便利です。

動的パーティショニングを有効にする方法

Firehose ストリームの動的パーティショニングは、Amazon Data Firehose マネジメントコンソール、CLI、または APIsを使用して設定できます。

重要

動的パーティショニングは、新しい Firehose ストリームを作成する場合にのみ有効にできます。動的パーティショニングが有効になっていない既存の Firehose ストリームに対して動的パーティショニングを有効にすることはできません。

新しい Firehose ストリームの作成中に Firehose マネジメントコンソールを使用して動的パーティショニングを有効にして設定する方法の詳細な手順については、「Amazon Firehose ストリームの作成」を参照してください。Firehose ストリームの送信先を指定するタスクに到達するときは、送信先として Amazon S3 を使用する Firehose ストリームでのみ動的パーティショニングがサポートされるため、「送信先として Amazon S3 を選択」セクションのステップに従ってください。

アクティブな Firehose ストリームで動的パーティショニングを有効にすると、新しいパーティショニングキーと S3 プレフィックス式を追加、削除、または更新することで、設定を更新できます。更新すると、Firehose は新しいキーと新しい S3 プレフィックス式の使用を開始します。

重要

Firehose ストリームで動的パーティショニングを有効にすると、この Firehose ストリームで無効にすることはできません。

動的パーティショニングのエラー処理

Amazon Data Firehose が Firehose ストリーム内のデータレコードを解析できない場合、または指定されたパーティショニングキーの抽出に失敗した場合、または S3 プレフィックス値に含まれる式を評価する場合、これらのデータレコードは S3 エラーバケットプレフィックスに配信されます。このプレフィックスは、動的パーティショニングを有効にする Firehose ストリームの作成時に指定する必要があります。S3 エラーバケットプレフィックスには、Firehose が指定された S3 送信先に配信できないすべてのレコードが含まれます。これらのレコードは、エラータイプに基づいて整理されます。レコードとともに、配信されたオブジェクトには、エラーの理解と解決に役立つエラーに関する情報も含まれます。

この Firehose ストリームの動的パーティショニングを有効にする場合は、Firehose ストリームの S3 エラーバケットプレフィックスを指定する必要があります。Firehose ストリームの動的パーティショニングを有効にしない場合は、S3 エラーバケットプレフィックスの指定はオプションです。

データバッファリングと動的パーティショニング

Amazon Data Firehose は、受信ストリーミングデータを特定のサイズにバッファしてから、指定された宛先に配信します。新しい Firehose ストリームの作成時にバッファサイズとバッファ間隔を設定したり、既存の Firehose ストリームのバッファサイズとバッファ間隔を更新したりできます。バッファサイズは MB 単位で、バッファ間隔は秒単位です。

動的パーティショニングが有効になっている場合、Firehose は、設定されたバッファリングヒント (サイズと時間) に基づいて、特定のパーティションに属するレコードを内部的にバッファしてから、これらのレコードを Amazon S3 バケットに配信します。最大サイズのオブジェクトを配信するために、Firehose は内部的にマルチステージバッファリングを使用します。したがって、レコードのバッチの end-to-end 遅延は、設定されたバッファリングヒント時間の 1.5 倍になる可能性があります。これは、Firehose ストリームのデータ鮮度に影響します。

アクティブパーティション数は、配信バッファ内のアクティブパーティションの総数です。たとえば、動的パーティショニングクエリが 1 秒あたり 3 つのパーティションを構築し、60 秒ごとに配信をトリガーするバッファのヒント設定がある場合、平均して 180 個のアクティブパーティションが作成されます。Firehose がパーティション内のデータを宛先に配信できない場合、このパーティションは配信できるようになるまで配信バッファでアクティブとしてカウントされます。

レコードデータフィールドと S3 プレフィックス式に基づいて S3 プレフィクスが新しい値に評価されると、新しいパーティションが作成されます。アクティブパーティションごとに新しいバッファが作成されます。同じ評価された S3 プレフィックスを持つ後続のすべてのレコードが、そのバッファに配信されます。

バッファがバッファサイズ制限またはバッファ時間間隔に達すると、Firehose はバッファデータを含むオブジェクトを作成し、指定された Amazon S3 プレフィックスに配信します。オブジェクトが配信されると、そのパーティションのバッファとパーティション自体が削除され、アクティブなパーティション数から削除されます。

Firehose は、各パーティションのバッファサイズまたは間隔が個別に満たされると、各バッファデータを 1 つのオブジェクトとして配信します。アクティブなパーティションの数が Firehose ストリームあたり 500 の制限に達すると、Firehose ストリーム内の残りのレコードは指定された S3 エラーバケットプレフィックス () に配信されますactivePartitionExceeded。Amazon Data Firehose の制限フォームを使用して、特定の Firehose ストリームごとに最大 5000 個のアクティブなパーティションのクォータの引き上げをリクエストできます。さらに多くのパーティションが必要な場合は、Firehose ストリームをさらに作成し、アクティブなパーティションをそれらに分散できます。