Amazon MQ で Lambda を使用する - AWS Lambda

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon MQ で Lambda を使用する

注記

Lambda 関数以外のターゲットにデータを送信したい、または送信する前にデータをエンリッチしたいという場合は、「Amazon EventBridge Pipes」を参照してください。

Amazon MQ は、Apache ActiveMQ および RabbitMQ 用のマネージドメッセージブローカーサービスです。メッセージブローカーを使用すると、ソフトウェアアプリケーションおよびコンポーネントは、さまざまなプログラミング言語、オペレーティングシステム、および、トピックまたはキューイベント送信先を介した正式なメッセージングプロトコルを使って、通信できるようになります。

また Amazon MQ は、ActiveMQ か RabbitMQ ブローカーをインストールすることにより、もしくは、異なるネットワークトポロジやその他のインフラストラクチャのニーズを提供することにより、ユーザーに代わって Amazon Elastic Compute Cloud (Amazon EC2)インスタンスを管理することもできます。

Lambda 関数を使用することで、Amazon MQ メッセージブローカーからのレコードを処理できます。Lambda は、ブローカーからメッセージを読み取り関数を同期的に呼び出す Lambda リソース、イベントソースマッピングによって関数を呼び出します。

Amazon MQ イベントソースマッピングには、次の設定制限があります。

  • 同時実行数 — Amazon MQ イベントソースマッピングを使用する Lambda 関数には、デフォルトの最大同時実行数設定があります。ActiveMQ の場合、Lambda サービスは同時実行環境の数を 5 つに制限します。RabbitMQ の場合、同時実行環境の数は 1 つに制限されます。関数の予約またはプロビジョニングされる同時実行数設定を変更しても、Lambda サービスはこれ以上実行環境を利用できるようにしません。デフォルトの最大同時実行数の増加をリクエストするには、AWS Support にお問い合わせください。

  • クロスアカウント - Lambda はクロスアカウント処理をサポートしていません。Lambda を使用して、別の AWS アカウント にある Amazon MQ メッセージブローカーからのレコードを処理することはできません。

  • 認証 - ActiveMQ では、ActiveMQ SimpleAuthenticationPlugin のみサポートされています。RabbitMQ の場合、PLAIN 認証メカニズムのみサポートされています。ユーザーは、資格情報の管理には AWS Secrets Manager を使用します。ActiveMQ 認証の詳細については、Amazon MQ デベロッパーガイドIntegrating ActiveMQ brokers with LDAP を参照してください。

  • 接続クォータ - ブローカーは、ワイヤレベルプロトコルごとに最大の接続可能数を持っています。このクォータは、ブローカーインスタンスタイプに基づいています。これらの制限の詳細については、Amazon MQ デベロッパーガイドQuotas in Amazon MQBrokers のセクションを参照してください。

  • 接続 - ブローカーをパブリックまたはプライベートの Virtual Private Cloud (VPC) に作成できます。プライベート VPC の場合、Lambda 関数が VPC にアクセスしてメッセージを受信する必要があります。詳細については、このトピックで後述する「ネットワーク構成」を参照してください。

  • イベント送信先 - キューの送信先のみがサポートされます。ただし、仮想トピックを使用することができます。仮想トピックは、内部的にトピックとして動作し、キューとして Lambda と対話しながら動作します。詳細については、Apache ActiveMQ ウェブサイトの Virtual Destinations および RabbitMQ ウェブサイトの Virtual Hostsを参照してください。

  • ネットワークトポロジ - ActiveMQ の場合、イベントソースマッピングごとに、1つの単一インスタンスまたはスタンバイブローカーがサポートされます。RabbitMQ の場合、イベントソースマッピングごとに、単一インスタンスブローカーまたはクラスターデプロイメントがサポートされます。単一インスタンスブローカーには、フェイルオーバーエンドポイントが必要です。これらのブローカーデプロイメントモードの詳細については、Amazon MQ デベロッパーガイドActive MQ Broker Architecture および Rabbit MQ Broker Architecture を参照してください。

  • プロトコル — サポートされるプロトコルは、Amazon MQ の統合のタイプによって異なります。

    • ActiveMQ 統合の場合、Lambda は OpenWire/Java Message Service (JMS) プロトコルを使用してメッセージを使用します。その他のプロトコルは、メッセージの使用をサポートしていません。JMS プロトコル内では、TextMessage および BytesMessage のみがサポートされています。Lambda は、JMS カスタムプロパティもサポートしています。OpenWire プロトコルの詳細については、Apache ActiveMQ ウェブサイトの OpenWire を参照してください。

    • RabbitMQ 統合の場合、Lambda は AMQP 0-9-1 プロトコルを使ってメッセージを使用します。その他のプロトコルは、メッセージの使用をサポートしていません。RabbitMQ による AMQP 0-9-1 プロトコルの実装の詳細については、RabbitMQ ウェブサイトの AMQP 0-9-1 Complete Reference Guide を参照してください。

Lambda は、Amazon MQ がサポートする ActiveMQ および RabbitMQ の最新バージョンを自動的にサポートします。サポートされている最新バージョンについては、Amazon MQ デベロッパーガイドAmazon MQ リリースノートを参照してください。

注記

デフォルトでは、Amazon MQ には毎週、ブローカー用のメンテナンスウィンドウがあります。その期間中、ブローカーは利用できません。スタンバイのないブローカーの場合、Lambda はそのウィンドウ中にメッセージを処理できません。

Lambda コンシューマーグループ

Amazon MQ と対話するため、Lambda は、Amazon MQ ブローカーから読み取ることができるコンシューマーグループを作成します。コンシューマーグループは、イベントソースマッピング UUID と同じ ID で作成されます。

Amazon MQ イベントソースの場合、Lambda はレコードをまとめてバッチ処理し、それらを単一のペイロードで関数に送信します。動作を制御するには、バッチ処理ウィンドウとバッチサイズを設定できます。Lambda は、最大 6 MB のペイロードサイズを処理する、バッチ処理ウィンドウの期限が切れる、またはレコード数が完全なバッチサイズに到達するまで、メッセージをプルします。詳細については、「バッチ処理動作」を参照してください。

コンシューマーグループは、メッセージをバイトの BLOB として取得し、それらを base64 でエンコードして単一の JSON ペイロードに変換してから、関数を呼び出します。関数がバッチ内のいずれかのメッセージに対してエラーを返すと、Lambda は、処理が成功するかメッセージが期限切れになるまでメッセージのバッチ全体を再試行します。

注記

Lambda 関数の最大タイムアウト制限は通常 15 分ですが、Amazon MSK、自己管理型 Apache Kafka、Amazon DocumentDB、および ActiveMQ と RabbitMQ 向け Amazon MQ のイベントソースマッピングでは、最大タイムアウト制限が 14 分の関数のみがサポートされます。この制約により、イベントソースマッピングは関数エラーと再試行を適切に処理できます。

Amazon CloudWatch の ConcurrentExecutions メトリクスを使用して、特定の関数の同時実行使用率を監視できます。同時実行の詳細については、「予約済同時実行数の設定」を参照してください。

例 Amazon MQ レコードイベント
ActiveMQ
{ "eventSource": "aws:mq", "eventSourceArn": "arn:aws:mq:us-west-2:111122223333:broker:test:b-9bcfa592-423a-4942-879d-eb284b418fc8", "messages": [ { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/text-message", "deliveryMode": 1, "replyTo": null, "type": null, "expiration": "60000", "priority": 1, "correlationId": "myJMSCoID", "redelivered": false, "destination": { "physicalName": "testQueue" }, "data":"QUJDOkFBQUE=", "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959, "properties": { "index": "1", "doAlarm": "false", "myCustomProperty": "value" } }, { "messageID": "ID:b-9bcfa592-423a-4942-879d-eb284b418fc8-1.mq.us-west-2.amazonaws.com-37557-1234520418293-4:1:1:1:1", "messageType": "jms/bytes-message", "deliveryMode": 1, "replyTo": null, "type": null, "expiration": "60000", "priority": 2, "correlationId": "myJMSCoID1", "redelivered": false, "destination": { "physicalName": "testQueue" }, "data":"LQaGQ82S48k=", "timestamp": 1598827811958, "brokerInTime": 1598827811958, "brokerOutTime": 1598827811959, "properties": { "index": "1", "doAlarm": "false", "myCustomProperty": "value" } } ] }
RabbitMQ
{ "eventSource": "aws:rmq", "eventSourceArn": "arn:aws:mq:us-west-2:111122223333:broker:pizzaBroker:b-9bcfa592-423a-4942-879d-eb284b418fc8", "rmqMessagesByQueue": { "pizzaQueue::/": [ { "basicProperties": { "contentType": "text/plain", "contentEncoding": null, "headers": { "header1": { "bytes": [ 118, 97, 108, 117, 101, 49 ] }, "header2": { "bytes": [ 118, 97, 108, 117, 101, 50 ] }, "numberInHeader": 10 }, "deliveryMode": 1, "priority": 34, "correlationId": null, "replyTo": null, "expiration": "60000", "messageId": null, "timestamp": "Jan 1, 1970, 12:33:41 AM", "type": null, "userId": "AIDACKCEVSQ6C2EXAMPLE", "appId": null, "clusterId": null, "bodySize": 80 }, "redelivered": false, "data": "eyJ0aW1lb3V0IjowLCJkYXRhIjoiQ1pybWYwR3c4T3Y0YnFMUXhENEUifQ==" } ] } }
注記

RabbitMQ の例では、pizzaQueue は RabbitMQ キューの名前、/ は仮想ホストの名前です。メッセージを受信すると、イベントソースは pizzaQueue::/ の下にメッセージを一覧表示します。

実行ロールのアクセス許可

Amazon MQ ブローカーからレコードを読み取るには、Lambda 関数は次のアクセス許可を実行ロールに追加する必要があります。

注記

暗号化されたカスタマー管理キーを使用する場合は、kms:Decrypt 権限も追加します。

ネットワーク構成

デフォルトでは、Amazon MQ ブローカーを作成すると、PubliclyAccessible フラグは false に設定されます。ブローカーがパブリック IP アドレスを受け取るには、PubliclyAccessible フラグを true にする必要があります。

イベントソースマッピングを通じて Lambda にブローカーへのフルアクセスを許可するには、ブローカーがパブリックエンドポイント (パブリック IP アドレス) を使用するか、ブローカーを作成した Amazon VPC へのアクセスを提供する必要があります。Amazon MQ を Lambda で使用する場合のベストプラクティスは、プライベートエンドポイントを使用し、Lambda 関数にブローカーの VPC へのアクセスを許可することです。詳細については、「Amazon MQ 開発者ガイド」の「パブリックアクセスのないブローカーを優先する」を参照してください。

Amazon MQ をトリガーとして追加すると、Lambda は Lambda 関数用の VPC 設定ではなく、Amazon MQ ブローカー用の VPC 設定を引き受けることに注意してください。Lambda 関数が Amazon MQ ブローカーのある VPC にアクセスするには、次のいずれかを実行します。

また、VPC 上で次の Amazon VPC セキュリティグループルールを設定する必要もあります。

  • インバウンドルール – イベントソースに指定されたセキュリティグループの、独自のセキュリティグループ内からのブローカーポート上のすべてのトラフィックを許可します。ActiveMQ はデフォルトでポート 61617 を使用し、RabbitMQ はデフォルトでポート 5671 を使用します。

  • アウトバウンドルール – すべての送信先に対して、ポート 443 上のすべてのトラフィックを許可します。ブローカーポートのすべてのトラフィックを独自のセキュリティグループ内で許可します。ActiveMQ はデフォルトでポート 61617 を使用し、RabbitMQ はデフォルトでポート 5671 を使用します。

NAT ゲートウェイの代わりに VPC エンドポイントを使用する場合は、その VPC エンドポイントに関連付けられたセキュリティグループが、イベントソースのセキュリティグループからのポート 443 上のすべてのインバウンドトラフィックを許可する必要があります。

アクセス許可を追加し、イベントソースマッピングを作成するには

イベントソースマッピングを作成し、Amazon MQ ブローカーから Lambda 関数にレコードを送信するよう Lambda に通知します。複数のイベントソースマッピングを作成することで、複数の関数で同じデータを処理したり、単一の関数で複数のソースから項目を処理したりできます。

Amazon MQ から読み取るように関数を設定するには、必要なアクセス許可を追加し Lambda コンソールで MQ トリガーを作成します。

アクセス許可を追加してトリガーを作成するには
  1. Lambda コンソールの関数ページを開きます。

  2. 関数の名前を選択します。

  3. [Configuration] (設定) タブを開き、次に [Permissions] (アクセス許可) をクリックします。

  4. [実行ロール] で、実行ロールのリンクを選択します。このリンクを選択すると、IAM コンソールでロールが開きます。

    
            実行ロールのリンク
  5. アクセス許可を追加、インラインポリシーを作成の順に選択します。

    
            IAMコンソールでインラインポリシーを作成する
  6. [ポリシーエディター] で、[JSON] を選択します。以下のポリシーを入力します。Amazon MQ ブローカーから読み取るには、関数にこれらのアクセス許可が必要です。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "mq:DescribeBroker", "secretsmanager:GetSecretValue", "ec2:CreateNetworkInterface", "ec2:DeleteNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:DescribeSecurityGroups", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }
    注記

    暗号化されたカスタマー管理キーを使用する場合は、kms:Decrypt アクセス許可も追加する必要があります。

  7. [Next] を選択します。ポリシー名を入力し、[ポリシーの作成] を選択します。

  8. Lambda コンソールの関数に戻ります。[関数の概要] で [トリガーを追加] をクリックします。

    
            Lambda コンソールの関数の概要セクション
  9. MQトリガータイプを選択します。

  10. 必須のオプションを設定し、[追加] を選択します。

Lambda では、Amazon MQ イベントソースの以下のオプションがサポートされています。

  • MQ ブローカー — Amazon MQ ブローカーを選択します。

  • バッチサイズ - 単一のバッチで取得するメッセージの最大数を設定します。

  • キュー名 - 使用する Amazon MQ キューを入力します。

  • ソースアクセス設定 — 仮想ホスト情報とブローカーの認証情報を保存する Secrets Manager のシークレットを入力します。

  • トリガーの有効化 - レコードの処理を停止するトリガーを無効にします。

トリガーを有効または無効にする(または削除する)には、 MQ トリガーをデザイナーで選択します。トリガーを再設定するには、イベントソースマッピング API 操作を使用します。

イベントソースマッピング API

AWS Command Line Interface (AWS CLI) または AWS SDK を使用してイベントソースを管理するには、以下の API 操作を使用できます。

注記

Amazon MQ のイベントソースマッピングを更新、無効化、または削除すると、変更が有効になるまでに最大 15 分かかる場合があります。この期間が経過する前に、イベントソースマッピングは引き続きイベントを処理し、以前の設定を使用して関数を呼び出す場合があります。これは、コンソールで表示されるイベントソースマッピングのステータスが、変更が適用されたことを示している場合にも当てはまります。

AWS Command Line Interface (AWS CLI) を使用してイベントソースマッピングを作成するには、create-event-source-mapping コマンドを使用します。

次の例では AWS CLI コマンドが、MQ-Example-Function という名前の Lambda 関数を ExampleMQBroker という名前の Amazon MQ RabbitMQ ベースのブローカーにマッピングするイベントソースを作成します。このコマンドは仮想ホストの名前と、ブローカーの認証情報を保存する Secrets Manager のシークレット ARN も提供します。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-24cacbb4-b295-49b7-8543-7ce7ce9dfb98 \ --function-name arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function \ --queues ExampleQueue \ --source-access-configuration Type=VIRTUAL_HOST,URI="/" Type=BASIC_AUTH,URI=arn:aws:secretsmanager:us-east-1:123456789012:secret:ExampleMQBrokerUserPassword-xPBMTt \

次のような出力が表示されます。

{ "UUID": "91eaeb7e-c976-1234-9451-8709db01f137", "BatchSize": 100, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601927898.741, "LastProcessingResult": "No records processed", "State": "Creating", "StateTransitionReason": "USER_INITIATED", "Queues": [ "ExampleQueue" ], "SourceAccessConfigurations": [ { "Type": "BASIC_AUTH", "URI": "arn:aws:secretsmanager:us-east-1:123456789012:secret:ExampleMQBrokerUserPassword-xPBMTt" } ] }

update-event-source-mapping コマンドを使用することで、Lambda によるバッチの処理方法や、処理不可能なレコードの破棄時期を指定する方法など、その他のオプションを設定できます。次のコマンド例では、イベントソースマッピングを更新して、バッチサイズを 2 にします。

aws lambda update-event-source-mapping \ --uuid 91eaeb7e-c976-1234-9451-8709db01f137 \ --batch-size 2

次のような出力が表示されます。

{ "UUID": "91eaeb7e-c976-1234-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Updating", "StateTransitionReason": "USER_INITIATED" }

Lambda は、これらの設定を非同期的に更新します。このプロセスが完了するまで、出力には変更は反映されません。get-event-source-mapping コマンドを使用して、リソースの現在のステータスを表示します。

aws lambda get-event-source-mapping \ --uuid 91eaeb7e-c976-4939-9451-8709db01f137

次のような出力が表示されます。

{ "UUID": "91eaeb7e-c976-4939-9451-8709db01f137", "BatchSize": 2, "EventSourceArn": "arn:aws:mq:us-east-1:123456789012:broker:ExampleMQBroker:b-b4d492ef-bdc3-45e3-a781-cd1a3102ecca", "FunctionArn": "arn:aws:lambda:us-east-1:123456789012:function:MQ-Example-Function", "LastModified": 1601928393.531, "LastProcessingResult": "No records processed", "State": "Enabled", "StateTransitionReason": "USER_INITIATED" }

イベントソースマッピングエラー

Lambda 関数で回復不可能なエラーが発生すると、Amazon MQ コンシューマーはレコードの処理を停止します。他のコンシューマーは、同じエラーが発生しない限り、処理を続行できます。コンシューマーが停止した原因を特定するには、StateTransitionReason から返された詳細の EventSourceMapping フィールドをチェックし、以下のいずれかのコードを探します。

ESM_CONFIG_NOT_VALID

イベントソースマッピングの設定が無効です。

EVENT_SOURCE_AUTHN_ERROR

Lambda がイベントソースの認証に失敗しました。

EVENT_SOURCE_AUTHZ_ERROR

Lambda にイベントソースへのアクセス許可がありません。

FUNCTION_CONFIG_NOT_VALID

関数の設定が無効です。

レコードは、サイズが原因で Lambda が削除した場合も、未処理になります。Lambda レコードのサイズ上限は 6 MB です。関数エラー時にメッセージを再配信するには、デッドレターキュー(DLQ)を使用します。詳細については、Apache ActiveMQ ウェブサイトの Message Redelivery and DLQ Handling および RabbitMQ ウェブサイトの Reliability Guide を参照してください。

注記

Lambda はカスタム再配信ポリシーをサポートしていません。代わりに Lambda は、Apache ActiveMQ ウェブサイトの Redelivery Policy ページのデフォルト値のポリシーを使用し、maximumRedeliveries は 6 に設定します。

Amazon MQ と RabbitMQ の設定パラメータ

すべての Lambda イベントソースタイプは、同じCreateEventSourceMapping および UpdateEventSourceMapping API オペレーションを共有しています。ただし、Amazon MQ と RabbitMQ に適用されるのは一部のパラメータのみです。

Amazon MQ と RabbitMQ に適用されるイベントソースパラメータ
[Parameter] (パラメータ) 必須 デフォルト メモ

BatchSize

N

100

最大: 10,000

有効

N

true

FunctionName

Y

FilterCriteria

N

Lambda のイベントフィルタリング

MaximumBatchingWindowInSeconds

N

500 ミリ秒

バッチ処理動作

キュー

N

消費する Amazon MQ ブローカーの送信先キューの名前。

SourceAccessConfigurations

N

ActiveMQ の場合、BASIC_AUTH 認証情報です。RabbitMQ の場合、BASIC_AUTH 認証情報と VIRTUAL_HOST 情報の両方を含めることができます。