Amazon MQ で Lambda を使用する - AWS Lambda

Amazon MQ で Lambda を使用する

Amazon MQ は、Apache ActiveMQ および RabbitMQ 用のマネージドメッセージブローカーサービスです。メッセージブローカーを使用すると、ソフトウェアアプリケーションおよびコンポーネントは、さまざまなプログラミング言語、オペレーティングシステム、および、トピックまたはキューイベント送信先を介した正式なメッセージングプロトコルを使って、通信できるようになります。

また Amazon MQ は、ActiveMQ か RabbitMQ ブローカーをインストールすることにより、もしくは、異なるネットワークトポロジやその他のインフラストラクチャのニーズを提供することにより、ユーザーに代わって Amazon Elastic Compute Cloud (Amazon EC2)インスタンスを管理することもできます。

Lambda 関数を使用することで、Amazon MQ メッセージブローカーからのレコードを処理できます。Lambda は、ブローカーからメッセージを読み取り関数を同期的に呼び出す Lambda リソース、イベントソースマッピングによって関数を呼び出します。

Amazon MQ イベントソースマッピングには、次の設定制限があります。

  • クロスアカウント - Lambda はクロスアカウント処理をサポートしていません。Lambda を使用して、別の AWS アカウントにある Amazon MQ メッセージブローカーからのレコードを処理することはできません。

  • 認証 - ActiveMQ では、ActiveMQ SimpleAuthenticationPlugin のみサポートされています。RabbitMQ の場合、PLAIN 認証メカニズムのみサポートされています。ユーザーは、資格情報の管理には AWS Secrets Manager を使用します。ActiveMQ 認証の詳細については、Amazon MQ デベロッパーガイドIntegrating ActiveMQ brokers with LDAP を参照してください。

  • 接続クォータ - ブローカーは、ワイヤレベルプロトコルごとに最大の接続可能数を持っています。このクォータは、ブローカーインスタンスタイプに基づいています。これらの制限の詳細については、Amazon MQ デベロッパーガイドQuotas in Amazon MQBrokers のセクションを参照してください。

  • 接続 - ブローカーをパブリックまたはプライベートの Virtual Private Cloud (VPC) に作成できます。プライベート VPC の場合、Lambda 関数が VPC にアクセスしてメッセージを受信する必要があります。詳細については、このトピックで後述する「イベントソースマッピング API」を参照してください。

  • イベント送信先 - キューの送信先のみがサポートされます。ただし、仮想トピックを使用することができます。仮想トピックは、内部的にトピックとして動作し、キューとして Lambda と対話しながら動作します。詳細については、Apache ActiveMQ ウェブサイトの Virtual Destinations および RabbitMQ ウェブサイトの Virtual Hostsを参照してください。

  • ネットワークトポロジ - ActiveMQ の場合、イベントソースマッピングごとに、1つの単一インスタンスまたはスタンバイブローカーがサポートされます。RabbitMQ の場合、イベントソースマッピングごとに、単一インスタンスブローカーまたはクラスターデプロイメントがサポートされます。単一インスタンスブローカーには、フェイルオーバーエンドポイントが必要です。これらのブローカーデプロイメントモードの詳細については、Amazon MQ デベロッパーガイドActive MQ Broker Architecture および Rabbit MQ Broker Architecture を参照してください。

  • プロトコル — サポートされるプロトコルは、Amazon MQ の統合のタイプによって異なります。

    • ActiveMQ 統合の場合、Lambda は OpenWire/Java Message Service (JMS) プロトコルを使用してメッセージを使用します。その他のプロトコルは、メッセージの使用をサポートしていません。JMS プロトコル内では、TextMessage および BytesMessage のみがサポートされています。OpenWire プロトコルの詳細については、Apache ActiveMQ ウェブサイトの OpenWire を参照してください。

    • RabbitMQ 統合の場合、Lambda は AMQP 0-9-1 プロトコルを使ってメッセージを使用します。その他のプロトコルは、メッセージの使用をサポートしていません。RabbitMQ による AMQP 0-9-1 プロトコルの実装の詳細については、RabbitMQ ウェブサイトの AMQP 0-9-1 Complete Reference Guide を参照してください。

Lambda は、Amazon MQ がサポートする ActiveMQ および RabbitMQ の最新バージョンを自動的にサポートします。サポートされている最新バージョンについては、Amazon MQ デベロッパーガイドAmazon MQ リリースノートを参照してください。

注記

デフォルトでは、Amazon MQ には毎週、ブローカー用のメンテナンスウィンドウがあります。その期間中、ブローカーは利用できません。スタンバイのないブローカーの場合、Lambda はそのウィンドウ中にメッセージを処理できません。

Lambda コンシューマーグループ

Amazon MQ と対話するため、Lambda は、Amazon MQ ブローカーから読み取ることができるコンシューマーグループを作成します。コンシューマーグループは、イベントソースマッピング UUID と同じ ID で作成されます。

Amazon MQ イベントソースの場合、Lambda はレコードをまとめてバッチ処理し、それらを単一のペイロードで関数に送信します。動作を制御するには、バッチ処理ウィンドウとバッチサイズを設定できます。Lambda は、最大 6 MB のペイロードサイズを処理する、バッチ処理ウィンドウの期限が切れる、またはレコード数が完全なバッチサイズに到達するまで、メッセージをプルします。詳しくは、「バッチ処理動作」を参照してください。

Lambda はバッチを単一のペイロードに変換してから、関数を呼び出します。メッセージは永続化も逆シリアル化もされません。その代わりに、コンシューマーグループはそれらをバイトの BLOB として取得してから、base64 でエンコードして JSON ペイロードにします。関数がバッチ内のいずれかのメッセージに対してエラーを返すと、Lambda は、処理が成功するかメッセージが期限切れになるまでメッセージのバッチ全体を再試行します。

注記

Lambda は、関数を最大 14 分実行できます。関数のタイムアウトは 14 分以下に設定してください (デフォルトのタイムアウト値は 3 秒です)。Lambda は、14 分を超える呼び出しを再試行する場合があります。

Amazon CloudWatch の ConcurrentExecutions メトリクスを使用して、特定の関数の同時実行使用率を監視できます。同時実行の詳細については、「Lambda の予約済み同時実行数の管理」を参照してください。

例 Amazon MQ レコードイベント

ActiveMQ
{ "eventSource": "aws:amq", "eventSourceArn": "arn:aws:mq:us-west-2:111122223333:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "messages": [ { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/text-message", "deliveryMode": 1, "replyTo": null, "type": null, "expiration": "60000", "priority": 1, "correlationId": "myJMSCoID", "redelivered": false, "destination": { "physicalname": "testQueue" }, "data":"QUJDOkFBQUE=", "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959 }, { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/bytes-message", "deliveryMode": 1, "replyTo": null, "type": null, "expiration": "60000", "priority": 2, "correlationId": "myJMSCoID1", "redelivered": false, "destination": { "physicalname": "testQueue" }, "data":"LQaGQ82S48k=", "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959 } ] }
RabbitMQ
{ "eventSource": "aws:rmq", "eventSourceArn": "arn:aws:mq:us-west-2:111122223333:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8", "rmqMessagesByQueue": { "pizzaQueue::/": [ { "basicProperties": { "contentType": "text/plain", "contentEncoding": null, "headers": { "header1": { "bytes": [ 118, 97, 108, 117, 101, 49 ] }, "header2": { "bytes": [ 118, 97, 108, 117, 101, 50 ] }, "numberInHeader": 10 }, "deliveryMode": 1, "priority": 34, "correlationId": null, "replyTo": null, "expiration": "60000", "messageId": null, "timestamp": "Jan 1, 1970, 12:33:41 AM", "type": null, "userId": "AIDACKCEVSQ6C2EXAMPLE", "appId": null, "clusterId": null, "bodySize": 80 }, "redelivered": false, "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==" } ] } }
注記

RabbitMQ の例では、pizzaQueue は RabbitMQ キューの名前、/ は仮想ホストの名前です。メッセージを受信すると、イベントソースは pizzaQueue::/ の下にメッセージを一覧表示します。

実行ロールのアクセス許可

Amazon MQ ブローカーからレコードを読み取るには、Lambda 関数は次のアクセス許可を実行ロールに追加する必要があります。

注記

暗号化されたカスタマー管理キーを使用する場合は、kms:Decrypt 権限も追加します。

イベントソースとしてのブローカーの設定

イベントソースマッピングを作成し、Amazon MQ ブローカーから Lambda 関数にレコードを送信するよう Lambda に通知します。複数のイベントソースマッピングを作成することで、複数の関数で同じデータを処理したり、単一の関数で複数のソースから項目を処理したりできます。

Amazon MQ から読み取るように関数を設定するには、Lambda コンソールで MQ トリガーを作成します。

トリガーを作成するには

  1. Lambda コンソールの [Functions] (関数) ページを開きます。

  2. 関数の名前を選択します。

  3. [機能の概要] で、[トリガーを追加] を選択します。

  4. トリガーのタイプを選択します。

  5. 必須のオプションを設定し、[Add] (追加) を選択します。

Lambda では、Amazon MQ イベントソースの以下のオプションがサポートされています。

  • MQ ブローカー — Amazon MQ ブローカーを選択します。

  • バッチサイズ - 単一のバッチで取得するメッセージの最大数を設定します。

  • キュー名 - 使用する Amazon MQ キューを入力します。

  • ソースアクセス設定 — 仮想ホスト情報とブローカーの認証情報を保存する Secrets Manager のシークレットを入力します。

  • トリガーの有効化 - レコードの処理を停止するトリガーを無効にします。

トリガーを有効または無効にする(または削除する)には、 MQ トリガーをデザイナーで選択します。トリガーを再設定するには、イベントソースマッピング API 操作を使用します。

イベントソースマッピング API

AWS Command Line Interface (AWS CLI) または AWS SDK を使用してイベントソースを管理するには、以下の API 操作を使用できます。

AWS Command Line Interface (AWS CLI) を使用してイベントソースマッピングを作成するには、create-event-source-mapping コマンドを使用します。

デフォルトではAmazon MQ、 ブローカーは PubliclyAccessible フラグを false に設定して作成されます。ブローカーにパブリック IP アドレスが与えられるのは、PubliclyAccessible が true に設定されている場合のみです。

イベントソースマッピングでフルアクセスする場合、ブローカーはパブリックエンドポイントを使用するか、VPC へアクセスを提供する必要があります。Amazon Virtual Private Cloud (Amazon VPC) のアクセス要件を満たすには、以下のいずれかを試します。

設定する Amazon VPC セキュリティグループルールには、少なくとも以下の設定が必要です。

  • インバウンドルール - パブリックアクセシビリティのないブローカーの場合、ソースとして指定されたセキュリティグループのすべてのポートですべてのトラフィックを許可します。パブリックアクセシビリティを持つブローカーの場合、すべての送信先のすべてのポート上のすべてのトラフィックを許可します。

  • アウトバウンドルール - すべての送信先について、すべてのポート上のすべてのトラフィックを許可します。

Amazon VPC 設定は、Amazon MQ API を通じて検出が可能であり、create-event-source-mapping のセットアップで定義する必要はありません。

次の例では AWS CLI コマンドが、MQ-Example-Function という名前の Lambda 関数を ExampleMQBroker という名前の Amazon MQ RabbitMQ ベースのブローカーにマッピングするイベントソースを作成します。このコマンドは仮想ホストの名前と、ブローカーの認証情報を保存する Secrets Manager のシークレット ARN も提供します。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-24cacbb4-b295-49b7-8543-7ce7ce9dfb98 \ --function-name arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function \ --queues ExampleQueue \ --source-access-configuration Type=VIRTUAL_HOST,URI="/" Type=BASIC_AUTH,URI=arn:aws:secretsmanager:us-east-1:123456789012:secret:ExampleMQBrokerUserPassword-xPBMTt \

次のような出力が表示されます。

{ "UUID": "91eaeb7e-c976-1234-9451-8709db01f137", "BatchSize": 100, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601927898.741, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "USER_INITIATED", "Queues": [ "ExampleQueue" ], "SourceAccessConfigurations": [ { "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:ExampleMQBrokerUserPassword-xPBMTt" } ] }

update-event-source-mapping コマンドを使用することで、Lambda によるバッチの処理方法や、処理不可能なレコードの破棄時期を指定する方法など、その他のオプションを設定できます。次のコマンド例では、イベントソースマッピングを更新して、バッチサイズを 2 にします。

aws lambda update-event-source-mapping \ --uuid 91eaeb7e-c976-1234-9451-8709db01f137 \ --batch-size 2

次のような出力が表示されます。

{ "UUID": "91eaeb7e-c976-1234-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Updating", "StateTransitionReason": "USER_INITIATED" }

Lambda は、これらの設定を非同期的に更新します。このプロセスが完了するまで、出力には変更は反映されません。get-event-source-mapping コマンドを使用して、リソースの現在のステータスを表示します。

aws lambda get-event-source-mapping \ --uuid 91eaeb7e-c976-4939-9451-8709db01f137

次のような出力が表示されます。

{ "UUID": "91eaeb7e-c976-4939-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Enabled", "StateTransitionReason": "USER_INITIATED" }

イベントソースマッピングエラー

Lambda 関数で回復不可能なエラーが発生すると、Amazon MQ コンシューマーはレコードの処理を停止します。他のコンシューマーは、同じエラーが発生しない限り、処理を続行できます。コンシューマーが停止した原因を特定するには、StateTransitionReason から返された詳細の EventSourceMapping フィールドをチェックし、以下のいずれかのコードを探します。

ESM_CONFIG_NOT_VALID

イベントソースマッピングの設定が無効です。

EVENT_SOURCE_AUTHN_ERROR

Lambda がイベントソースの認証に失敗しました。

EVENT_SOURCE_AUTHZ_ERROR

Lambda にイベントソースへのアクセス許可がありません。

FUNCTION_CONFIG_NOT_VALID

関数の設定が無効です。

レコードは、サイズが原因で Lambda が削除した場合も、未処理になります。Lambda レコードのサイズ上限は 6 MB です。関数エラー時にメッセージを再配信するには、デッドレターキュー(DLQ)を使用します。詳細については、Apache ActiveMQ ウェブサイトの Message Redelivery and DLQ Handling および RabbitMQ ウェブサイトの Reliability Guide を参照してください。

注記

Lambda はカスタム再配信ポリシーをサポートしていません。代わりに Lambda は、Apache ActiveMQ ウェブサイトの Redelivery Policy ページのデフォルト値のポリシーを使用し、maximumRedeliveries は 5 に設定します。

Amazon MQ と RabbitMQ の設定パラメータ

すべての Lambda イベントソースタイプは、同じCreateEventSourceMapping および UpdateEventSourceMapping API オペレーションを共有しています。ただし、Amazon MQ と RabbitMQ に適用されるのは一部のパラメータのみです。

Amazon MQ と RabbitMQ に適用されるイベントソースパラメータ
[Parameter] (パラメータ) [Required] (必須) デフォルト メモ

BatchSize

N

100

最大: 10,000

有効

N

true

FunctionName

Y

キュー

N

消費する Amazon MQ ブローカーの送信先キューの名前。

SourceAccessConfigurations

N

ActiveMQ の場合、BASIC_AUTH 認証情報です。RabbitMQ の場合、BASIC_AUTH 認証情報と VIRTUAL_HOST 情報の両方を含めることができます。