トランザクションアウトボックスパターン - AWS 規範ガイダンス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

トランザクションアウトボックスパターン

Intent

トランザクションアウトボックスパターンは、単一オペレーションにデータベース書き込みオペレーションとメッセージまたはイベント通知の両方が含まれる場合に分散システムで発生する二重書き込みオペレーションの問題を解決します。二重書き込みオペレーションは、アプリケーションが 2 つの異なるシステムに書き込みを行う場合に発生します。例えば、マイクロサービスがデータをデータベースに保持し、メッセージを送信して他のシステムに通知する必要がある場合などです。これらのオペレーションのいずれかが失敗すると、データが不整合になる可能性があります。

導入する理由

データベースの更新後にマイクロサービスがイベント通知を送信する場合、データ整合性と信頼性を確保するために、これら 2 つのオペレーションはアトミックに実行する必要があります。

  • データベースの更新が成功してもイベント通知が失敗した場合、ダウンストリームのサービスはその変更を認識せず、システムは整合性のない状態になる可能性があります。

  • データベースの更新が失敗してもイベント通知が送信されると、データが破損し、システムの信頼性に影響する可能性があります。

適用対象

トランザクションアウトボックスパターンは以下の場合に使用します。

  • データベースの更新によってイベント通知が開始されるように、イベント駆動型アプリケーションを構築している場合。

  • 2 つのサービスが関与するオペレーションを確実にアトミックにしたい場合。

  • イベントソーシングパターンを実装したい場合。

問題点と考慮事項

  • 重複メッセージ: イベント処理サービスは重複したメッセージやイベントを送信する可能性があるため、処理されたメッセージを追跡して使用側サービスに冪等性があるようにすることをお勧めします。

  • 通知の順序: サービスがデータベースを更新するのと同じ順序でメッセージまたはイベントを送信します。これは、データストアの point-in-time 復旧にイベントストアを使用できるイベント調達パターンにとって重要です。順序が正しくないと、データの品質が低下する可能性があります。通知の順序が保持されないと、結果整合性とデータベースのロールバックが問題をさらに悪化させる可能性があります。

  • トランザクションロールバック: トランザクションがロールバックされた場合は、イベント通知を送信しないでください。

  • サービスレベルのトランザクション処理: トランザクションがデータストアの更新を必要とするサービスにまたがる場合は、Saga オーケストレーションパターンを使用してデータストア全体のデータの完全性を維持します。

実装

高レベルのアーキテクチャ

次のシーケンス図は、二重書き込みオペレーション中に発生するイベントの順序を示しています。

二重書き込みオペレーション中のイベントの順序
  1. フライトサービスはデータベースに書き込み、支払いサービスにイベント通知を送信します。

  2. メッセージブローカーはメッセージとイベントを支払いサービスに伝えます。メッセージブローカーに障害が発生すると、支払いサービスは更新を受信できなくなります。

フライトデータベースの更新に失敗しても通知が送信された場合、支払いサービスはイベント通知に基づいて支払いを処理します。これは、ダウンストリームのデータ不整合の原因になります。

AWS のサービスを使用した実装

シーケンス図のパターンを示すために、次の AWS 図に示すように、次のサービスを使用します。

AWS Lambda、Amazon RDS、および Amazon SQS を使用したトランザクションアウトボックスパターン

トランザクションをコミットした後にフライトサービスに障害が発生すると、イベント通知が送信されない可能性があります。

コミットオペレーション後のトランザクションの失敗

ただし、トランザクションが失敗してロールバックされたにもかかわらずイベント通知が送信され、支払いサービスが支払いを処理する場合があります。

コミットオペレーションとロールバック後のトランザクションの失敗

この問題に対処するには、アウトボックステーブルまたは変更データキャプチャ (CDC) を使用します。以下のセクションでは、これら 2 つのオプションと、AWS のサービスを使用してそれらを実装する方法について説明します。

アウトボックステーブルをリレーショナルデータベースで使用する

アウトボックステーブルには、フライトサービスからのすべてのイベントがタイムスタンプおよびシーケンス番号とともに保存されます。

フライトテーブルが更新されると、同じトランザクションでアウトボックステーブルも更新されます。別のサービス (イベント処理サービスなど) がアウトボックステーブルから読み取り、Amazon SQS にイベントを送信します。Amazon SQS は、さらに処理するためにイベントに関するメッセージを支払いサービスに送信します。Amazon SQS スタンダードキューは、メッセージが少なくとも 1 回配信され、メッセージが失われないことを保証します。ただし、Amazon SQS スタンダードキューを使用する場合、同じメッセージまたはイベントが複数回配信される可能性があるため、イベント通知サービスの冪等性 (つまり、同じメッセージを複数回処理しても悪影響が無いこと) を確認する必要があります。メッセージをメッセージ順序付きで 1 回だけ配信する必要がある場合は、Amazon SQS 先入れ先出し (FIFO) キューを使用できます。

フライトテーブルの更新またはアウトボックステーブルの更新が失敗した場合、トランザクション全体がロールバックされるため、ダウンストリームのデータに不整合が生じることはありません。

ダウンストリームのデータ不整合がないロールバック

以下の図では、トランザクションアウトボックスアーキテクチャは Amazon RDS データベースを使用して実装されています。イベント処理サービスがアウトボックステーブルを読み取ると、コミットされた (成功した) トランザクションに含まれる行のみを認識し、イベントのメッセージを SQS キューに格納します。SQS キューは支払いサービスによって読み取られ、さらに処理されます。この設計では、タイムスタンプとシーケンス番号を使用して二重書き込みオペレーションの問題が解決され、メッセージとイベントの順序が保持されます。

二重書き込みオペレーションの問題を解決する設計

変更データキャプチャ (CDC) の使用

一部のデータベースでは、変更されたデータをキャプチャするためのアイテムレベルの変更の発行がサポートされています。変更された項目を特定し、それに応じてイベント通知を送信できます。これにより、更新を追跡するテーブルをもう 1 つ作成する手間が省けます。フライトサービスによって開始されたイベントは、同じアイテムの別の属性に保存されます。

Amazon DynamoDB は CDC 更新をサポートするキー値 NoSQL データベースです。次のシーケンス図では、DynamoDB はアイテムレベルの変更を Amazon DynamoDB Streams に発行しています。イベント処理サービスはストリームから読み取り、さらに処理するためにイベント通知を支払いサービスに発行します。

DynamoDB と DynamoDB Streams を備えたトランザクションアウトボックス

DynamoDB Streams は、時系列シーケンスを使用して DynamoDB テーブル内のアイテムレベルの変更に関連する情報の流れをキャプチャします。

DynamoDB テーブルでストリームを有効にすることで、トランザクションアウトボックスパターンを実装できます。イベント処理サービスの Lambda 関数は、これらのストリームに関連付けられています。

  • フライトテーブルが更新されると、変更されたデータが DynamoDB Streams によってキャプチャされ、イベント処理サービスがストリームをポーリングして新しいレコードを探します。

  • 新しいストリームレコードが使用可能になると、Lambda 関数はイベントのメッセージを同期的に SQS キューに配置し、さらに処理します。DynamoDB アイテムに属性を追加して、必要に応じてタイムスタンプとシーケンス番号をキャプチャし、実装の堅牢性を高めることができます。

CDC を備えたトランザクションアウトボックス

「サンプルコード」

アウトボックステーブルの使用

このセクションのサンプルコードは、アウトボックステーブルを使用してトランザクションアウトボックスパターンを実装する方法を示しています。完全なコードを表示するには、この例のGitHubリポジトリを参照してください。

次のコードスニペットは、データベース内の Flight エンティティと Flight イベントを 1 回のトランザクションでそれぞれのテーブルに保存します。

@PostMapping("/flights") @Transactional public Flight createFlight(@Valid @RequestBody Flight flight) { Flight savedFlight = flightRepository.save(flight); JsonNode flightPayload = objectMapper.convertValue(flight, JsonNode.class); FlightOutbox outboxEvent = new FlightOutbox(flight.getId().toString(), FlightOutbox.EventType.FLIGHT_BOOKED, flightPayload); outboxRepository.save(outboxEvent); return savedFlight; }

別のサービスが定期的にアウトボックステーブルをスキャンして新しいイベントがないか確認し、Amazon SQS に送信し、Amazon SQS が正常に応答した場合はテーブルから削除します。ポーリングレートは application.properties ファイルで設定できます。

@Scheduled(fixedDelayString = "${sqs.polling_ms}") public void forwardEventsToSQS() { List<FlightOutbox> entities = outboxRepository.findAllByOrderByIdAsc(Pageable.ofSize(batchSize)).toList(); if (!entities.isEmpty()) { GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder() .queueName(sqsQueueName) .build(); String queueUrl = this.sqsClient.getQueueUrl(getQueueRequest).queueUrl(); List<SendMessageBatchRequestEntry> messageEntries = new ArrayList<>(); entities.forEach(entity -> messageEntries.add(SendMessageBatchRequestEntry.builder() .id(entity.getId().toString()) .messageGroupId(entity.getAggregateId()) .messageDeduplicationId(entity.getId().toString()) .messageBody(entity.getPayload().toString()) .build()) ); SendMessageBatchRequest sendMessageBatchRequest = SendMessageBatchRequest.builder() .queueUrl(queueUrl) .entries(messageEntries) .build(); sqsClient.sendMessageBatch(sendMessageBatchRequest); outboxRepository.deleteAllInBatch(entities); } }

変更データキャプチャ (CDC) の使用

このセクションのサンプルコードは、DynamoDB の変更データキャプチャ (CDC) 機能を使用してトランザクションアウトボックスパターンを実装する方法を示しています。完全なコードを表示するには、この例のGitHubリポジトリを参照してください。

次の AWS Cloud Development Kit (AWS CDK) コードスニペットは、DynamoDB のフライトテーブルと Amazon Kinesis データストリーム (cdcStream) を作成し、すべての更新をストリームに送信するようにフライトテーブルを設定します。

Const cdcStream = new kinesis.Stream(this, 'flightsCDCStream', { streamName: 'flightsCDCStream' }) const flightTable = new dynamodb.Table(this, 'flight', { tableName: 'flight', kinesisStream: cdcStream, partitionKey: { name: 'id', type: dynamodb.AttributeType.STRING, } });

次のコードスニペットと設定は、Kinesis ストリーム内の更新を取得し、これらのイベントを SQS キューに転送してさらに処理するSpound cloud stream 関数を定義します。

applications.properties spring.cloud.stream.bindings.sendToSQS-in-0.destination=${kinesisstreamname} spring.cloud.stream.bindings.sendToSQS-in-0.content-type=application/ddb QueueService.java @Bean public Consumer<Flight> sendToSQS() { return this::forwardEventsToSQS; } public void forwardEventsToSQS(Flight flight) { GetQueueUrlRequest getQueueRequest = GetQueueUrlRequest.builder() .queueName(sqsQueueName) .build(); String queueUrl = this.sqsClient.getQueueUrl(getQueueRequest).queueUrl(); try { SendMessageRequest send_msg_request = SendMessageRequest.builder() .queueUrl(queueUrl) .messageBody(objectMapper.writeValueAsString(flight)) .messageGroupId("1") .messageDeduplicationId(flight.getId().toString()) .build(); sqsClient.sendMessage(send_msg_request); } catch (IOException | AmazonServiceException e) { logger.error("Error sending message to SQS", e); } }

GitHub リポジトリ

このパターンのサンプルアーキテクチャの完全な実装については、https://github.com/aws-samples/transactional-outbox-pattern の GitHub リポジトリを参照してください。