Amazon MQ で Lambda を使用する
注記
Lambda 関数以外のターゲットにデータを送信したい、または送信する前にデータをエンリッチしたいという場合は、「Amazon EventBridge Pipes」を参照してください。
Amazon MQ は、Apache ActiveMQ
また Amazon MQ は、ActiveMQ か RabbitMQ ブローカーをインストールすることにより、もしくは、異なるネットワークトポロジやその他のインフラストラクチャのニーズを提供することにより、ユーザーに代わって Amazon Elastic Compute Cloud (Amazon EC2)インスタンスを管理することもできます。
Lambda 関数を使用することで、Amazon MQ メッセージブローカーからのレコードを処理できます。Lambda は、ブローカーからメッセージを読み取り関数を同期的に呼び出す Lambda リソース、イベントソースマッピングによって関数を呼び出します。
Amazon MQ イベントソースマッピングには、次の設定制限があります。
クロスアカウント - Lambda はクロスアカウント処理をサポートしていません。Lambda を使用して、別の AWS アカウントにある Amazon MQ メッセージブローカーからのレコードを処理することはできません。
-
認証 - ActiveMQ では、ActiveMQ SimpleAuthenticationPlugin
のみサポートされています。RabbitMQ の場合、PLAIN 認証メカニズムのみサポートされています。ユーザーは、資格情報の管理には AWS Secrets Manager を使用します。ActiveMQ 認証の詳細については、Amazon MQ デベロッパーガイドの Integrating ActiveMQ brokers with LDAP を参照してください。 -
接続クォータ - ブローカーは、ワイヤレベルプロトコルごとに最大の接続可能数を持っています。このクォータは、ブローカーインスタンスタイプに基づいています。これらの制限の詳細については、Amazon MQ デベロッパーガイドの Quotas in Amazon MQ の Brokers のセクションを参照してください。
-
接続 - ブローカーをパブリックまたはプライベートの Virtual Private Cloud (VPC) に作成できます。プライベート VPC の場合、Lambda 関数が VPC にアクセスしてメッセージを受信する必要があります。詳細については、このトピックで後述する「イベントソースマッピング API」を参照してください。
-
イベント送信先 - キューの送信先のみがサポートされます。ただし、仮想トピックを使用することができます。仮想トピックは、内部的にトピックとして動作し、キューとして Lambda と対話しながら動作します。詳細については、Apache ActiveMQ ウェブサイトの Virtual Destinations
および RabbitMQ ウェブサイトの Virtual Hosts を参照してください。 -
ネットワークトポロジ - ActiveMQ の場合、イベントソースマッピングごとに、1つの単一インスタンスまたはスタンバイブローカーがサポートされます。RabbitMQ の場合、イベントソースマッピングごとに、単一インスタンスブローカーまたはクラスターデプロイメントがサポートされます。単一インスタンスブローカーには、フェイルオーバーエンドポイントが必要です。これらのブローカーデプロイメントモードの詳細については、Amazon MQ デベロッパーガイドの Active MQ Broker Architecture および Rabbit MQ Broker Architecture を参照してください。
-
プロトコル — サポートされるプロトコルは、Amazon MQ の統合のタイプによって異なります。
ActiveMQ 統合の場合、Lambda は OpenWire/Java Message Service (JMS) プロトコルを使用してメッセージを使用します。その他のプロトコルは、メッセージの使用をサポートしていません。JMS プロトコル内では、
TextMessage
および BytesMessage
のみがサポートされています。Lambda は、JMS カスタムプロパティもサポートしています。OpenWire プロトコルの詳細については、Apache ActiveMQ ウェブサイトの OpenWire を参照してください。 RabbitMQ 統合の場合、Lambda は AMQP 0-9-1 プロトコルを使ってメッセージを使用します。その他のプロトコルは、メッセージの使用をサポートしていません。RabbitMQ による AMQP 0-9-1 プロトコルの実装の詳細については、RabbitMQ ウェブサイトの AMQP 0-9-1 Complete Reference Guide
を参照してください。
Lambda は、Amazon MQ がサポートする ActiveMQ および RabbitMQ の最新バージョンを自動的にサポートします。サポートされている最新バージョンについては、Amazon MQ デベロッパーガイドの Amazon MQ リリースノートを参照してください。
注記
デフォルトでは、Amazon MQ には毎週、ブローカー用のメンテナンスウィンドウがあります。その期間中、ブローカーは利用できません。スタンバイのないブローカーの場合、Lambda はそのウィンドウ中にメッセージを処理できません。
セクション
Lambda コンシューマーグループ
Amazon MQ と対話するため、Lambda は、Amazon MQ ブローカーから読み取ることができるコンシューマーグループを作成します。コンシューマーグループは、イベントソースマッピング UUID と同じ ID で作成されます。
Amazon MQ イベントソースの場合、Lambda はレコードをまとめてバッチ処理し、それらを単一のペイロードで関数に送信します。動作を制御するには、バッチ処理ウィンドウとバッチサイズを設定できます。Lambda は、最大 6 MB のペイロードサイズを処理する、バッチ処理ウィンドウの期限が切れる、またはレコード数が完全なバッチサイズに到達するまで、メッセージをプルします。詳細については、「バッチ処理動作」を参照してください。
コンシューマーグループは、メッセージをバイトの BLOB として取得し、それらを base64 でエンコードして単一の JSON ペイロードに変換してから、関数を呼び出します。関数がバッチ内のいずれかのメッセージに対してエラーを返すと、Lambda は、処理が成功するかメッセージが期限切れになるまでメッセージのバッチ全体を再試行します。
注記
Lambda 関数の最大タイムアウト制限は通常 15 分ですが、Amazon MSK、自己管理型 Apache Kafka、および ActiveMQ と RabbitMQ の Amazon MQ のイベントソースマッピングでは、最大タイムアウト制限が 14 分の関数のみがサポートされます。この制約により、イベントソースマッピングは関数エラーと再試行を適切に処理できます。
Amazon CloudWatch の ConcurrentExecutions
メトリクスを使用して、特定の関数の同時実行使用率を監視できます。同時実行の詳細については、「予約済同時実行数の設定」を参照してください。
例 Amazon MQ レコードイベント
注記
RabbitMQ の例では、pizzaQueue
は RabbitMQ キューの名前、/
は仮想ホストの名前です。メッセージを受信すると、イベントソースは pizzaQueue::/
の下にメッセージを一覧表示します。
実行ロールのアクセス許可
Amazon MQ ブローカーからレコードを読み取るには、Lambda 関数は次のアクセス許可を実行ロールに追加する必要があります。
注記
暗号化されたカスタマー管理キーを使用する場合は、kms:Decrypt
権限も追加します。
イベントソースとしてのブローカーの設定
イベントソースマッピングを作成し、Amazon MQ ブローカーから Lambda 関数にレコードを送信するよう Lambda に通知します。複数のイベントソースマッピングを作成することで、複数の関数で同じデータを処理したり、単一の関数で複数のソースから項目を処理したりできます。
Amazon MQ から読み取るように関数を設定するには、Lambda コンソールで MQ トリガーを作成します。
トリガーを作成するには
Lambda コンソールの [Functions]
(関数) ページを開きます。 -
関数の名前を選択します。
-
[機能の概要] で、[トリガーを追加] を選択します。
-
トリガーのタイプを選択します。
-
必須のオプションを設定し、[Add] (追加) を選択します。
Lambda では、Amazon MQ イベントソースの以下のオプションがサポートされています。
-
MQ ブローカー — Amazon MQ ブローカーを選択します。
-
バッチサイズ - 単一のバッチで取得するメッセージの最大数を設定します。
-
キュー名 - 使用する Amazon MQ キューを入力します。
-
ソースアクセス設定 — 仮想ホスト情報とブローカーの認証情報を保存する Secrets Manager のシークレットを入力します。
-
トリガーの有効化 - レコードの処理を停止するトリガーを無効にします。
トリガーを有効または無効にする(または削除する)には、 MQ トリガーをデザイナーで選択します。トリガーを再設定するには、イベントソースマッピング API 操作を使用します。
イベントソースマッピング API
AWS Command Line Interface (AWS CLI) または AWS SDK
注記
Amazon MQ のイベントソースマッピングを更新、無効化、または削除すると、変更が有効になるまでに最大 15 分かかる場合があります。この期間が経過する前に、イベントソースマッピングは引き続きイベントを処理し、以前の設定を使用して関数を呼び出す場合があります。これは、コンソールで表示されるイベントソースマッピングのステータスが、変更が適用されたことを示している場合にも当てはまります。
AWS Command Line Interface (AWS CLI) を使用してイベントソースマッピングを作成するには、create-event-source-mapping
コマンドを使用します。
デフォルトではAmazon MQ、 ブローカーは PubliclyAccessible
フラグを false に設定して作成されます。ブローカーにパブリック IP アドレスが与えられるのは、PubliclyAccessible
が true に設定されている場合のみです。
イベントソースマッピングでフルアクセスする場合、ブローカーはパブリックエンドポイントを使用するか、VPC へアクセスを提供する必要があります。Amazon MQ をトリガーとして追加すると、Lambda は Lambda 関数用の VPC 設定ではなく、Amazon MQ ブローカー用の VPC 設定を引き受けることに注意してください。Amazon Virtual Private Cloud (Amazon VPC) のアクセス要件を満たすには、以下のいずれかを試します。
パブリックサブネットごとに 1 つの NAT ゲートウェイを設定します。詳細については、「VPC に接続した関数のインターネットアクセスとサービスアクセス」を参照してください。
Amazon VPC と Lambda との間に接続を作成します。Amazon VPC は、AWS Security Token Service(AWS STS) およびSecrets Manager のエンドポイントにも接続する必要があります。詳細については、「Lambda のインターフェイス VPC エンドポイントの設定」を参照してください。
設定する Amazon VPC セキュリティグループルールには、少なくとも以下の設定が必要です。
-
インバウンドルール - パブリックアクセシビリティのないブローカーの場合、ソースとして指定されたセキュリティグループのすべてのポートですべてのトラフィックを許可します。パブリックアクセシビリティを持つブローカーの場合、すべての送信先のすべてのポート上のすべてのトラフィックを許可します。
-
アウトバウンドルール - すべての送信先について、すべてのポート上のすべてのトラフィックを許可します。
Amazon VPC 設定は、Amazon MQ API を通じて検出が可能であり、create-event-source-mapping
のセットアップで定義する必要はありません。
次の例では AWS CLI コマンドが、MQ-Example-Function
という名前の Lambda 関数を ExampleMQBroker
という名前の Amazon MQ RabbitMQ ベースのブローカーにマッピングするイベントソースを作成します。このコマンドは仮想ホストの名前と、ブローカーの認証情報を保存する Secrets Manager のシークレット ARN も提供します。
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-24cacbb4-b295-49b7-8543-7ce7ce9dfb98
\
--function-name arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function
\
--queues ExampleQueue
\
--source-access-configuration Type=VIRTUAL_HOST,URI="/" Type=BASIC_AUTH,URI=arn:aws:secretsmanager:us-east-1:123456789012:secret:ExampleMQBrokerUserPassword-xPBMTt
\
次のような出力が表示されます。
{ "UUID": "91eaeb7e-c976-1234-9451-8709db01f137", "BatchSize": 100, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601927898.741, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "USER_INITIATED", "Queues": [ "ExampleQueue" ], "SourceAccessConfigurations": [ { "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:ExampleMQBrokerUserPassword-xPBMTt" } ] }
update-event-source-mapping
コマンドを使用することで、Lambda によるバッチの処理方法や、処理不可能なレコードの破棄時期を指定する方法など、その他のオプションを設定できます。次のコマンド例では、イベントソースマッピングを更新して、バッチサイズを 2 にします。
aws lambda update-event-source-mapping \
--uuid 91eaeb7e-c976-1234-9451-8709db01f137
\
--batch-size 2
次のような出力が表示されます。
{ "UUID": "91eaeb7e-c976-1234-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Updating", "StateTransitionReason": "USER_INITIATED" }
Lambda は、これらの設定を非同期的に更新します。このプロセスが完了するまで、出力には変更は反映されません。get-event-source-mapping
コマンドを使用して、リソースの現在のステータスを表示します。
aws lambda get-event-source-mapping \
--uuid 91eaeb7e-c976-4939-9451-8709db01f137
次のような出力が表示されます。
{ "UUID": "91eaeb7e-c976-4939-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Enabled", "StateTransitionReason": "USER_INITIATED" }
イベントソースマッピングエラー
Lambda 関数で回復不可能なエラーが発生すると、Amazon MQ コンシューマーはレコードの処理を停止します。他のコンシューマーは、同じエラーが発生しない限り、処理を続行できます。コンシューマーが停止した原因を特定するには、StateTransitionReason
から返された詳細の EventSourceMapping
フィールドをチェックし、以下のいずれかのコードを探します。
ESM_CONFIG_NOT_VALID
-
イベントソースマッピングの設定が無効です。
EVENT_SOURCE_AUTHN_ERROR
-
Lambda がイベントソースの認証に失敗しました。
EVENT_SOURCE_AUTHZ_ERROR
-
Lambda にイベントソースへのアクセス許可がありません。
FUNCTION_CONFIG_NOT_VALID
-
関数の設定が無効です。
レコードは、サイズが原因で Lambda が削除した場合も、未処理になります。Lambda レコードのサイズ上限は 6 MB です。関数エラー時にメッセージを再配信するには、デッドレターキュー(DLQ)を使用します。詳細については、Apache ActiveMQ ウェブサイトの Message Redelivery and DLQ Handling
注記
Lambda はカスタム再配信ポリシーをサポートしていません。代わりに Lambda は、Apache ActiveMQ ウェブサイトの Redelivery PolicymaximumRedeliveries
は 5 に設定します。
Amazon MQ と RabbitMQ の設定パラメータ
すべての Lambda イベントソースタイプは、同じCreateEventSourceMapping および UpdateEventSourceMapping API オペレーションを共有しています。ただし、Amazon MQ と RabbitMQ に適用されるのは一部のパラメータのみです。
Amazon MQ と RabbitMQ に適用されるイベントソースパラメータ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
パラメータ | 必須 | デフォルト | メモ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
BatchSize |
N |
100 |
最大: 10,000 |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
有効 |
N |
true |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
FunctionName |
Y |
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
FilterCriteria |
N |
|
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
MaximumBatchingWindowInSeconds |
N |
500 ミリ秒 |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
キュー |
N |
消費する Amazon MQ ブローカーの送信先キューの名前。 |
|||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
SourceAccessConfigurations |
N |
ActiveMQ の場合、BASIC_AUTH 認証情報です。RabbitMQ の場合、BASIC_AUTH 認証情報と VIRTUAL_HOST 情報の両方を含めることができます。 |