

本文為英文版的機器翻譯版本，如內容有任何歧義或不一致之處，概以英文版為準。

# 搭配 使用 OpenSearch 擷取管道 AWS Lambda
<a name="configure-client-lambda"></a>

使用[AWS Lambda 處理器](https://opensearch.org/docs/latest/data-prepper/pipelines/configuration/processors/aws-lambda/)透過自訂程式碼，從 OpenSearch Ingestion 支援的任何來源或目的地豐富資料。使用 Lambda 處理器，您可以套用自己的資料轉換或擴充，然後將處理的事件傳回管道以進行進一步處理。此處理器可啟用自訂資料處理，並讓您完全控制資料在流經管道前的處理方式。

**注意**  
Lambda 處理器處理之單一事件的承載大小限制為 5 MB。此外，Lambda 處理器僅支援 JSON 陣列格式的回應。

## 先決條件
<a name="configure-clients-lambda-prereqs"></a>

使用 Lambda 處理器建立管道之前，請先建立下列資源：
+ 函數 AWS Lambda ，可豐富和轉換您的來源資料。如需說明，請參閱[建立您的第一個 Lambda 函數](https://docs.aws.amazon.com/lambda/latest/dg/getting-started.html)。
+ 將成為管道接收器的 OpenSearch Service 網域或 OpenSearch Serverless 集合。如需詳細資訊，請參閱[建立 OpenSearch Service 網域](createupdatedomains.md#createdomains)及[建立集合](serverless-create.md)。
+ 管道角色，其中包含寫入網域或集合目的地的許可。如需詳細資訊，請參閱[管道角色](pipeline-security-overview.md#pipeline-security-sink)。

  管道角色還需要附加的許可政策，允許它叫用管道組態中指定的 Lambda 函數。例如：

------
#### [ JSON ]

****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Sid": "allowinvokeFunction",
              "Effect": "Allow",
              "Action": [
                  "lambda:invokeFunction",
                  "lambda:InvokeAsync",
                  "lambda:ListFunctions"
              ],
              "Resource": "arn:aws:lambda:{{us-east-1}}:{{111122223333}}:function:{{function-name}}"
              
          }
      ]
  }
  ```

------

## 建立管道
<a name="configure-clients-security-lake-pipeline-role"></a>

若要使用 AWS Lambda 做為處理器，請設定 OpenSearch Ingestion 管道並指定 `aws_lambda`做為處理器。您也可以使用**AWS Lambda 自訂擴充**藍圖來建立管道。如需詳細資訊，請參閱[使用藍圖](pipeline-blueprint.md)。

下列範例管道會從 HTTP 來源接收資料、使用日期處理器和 AWS Lambda 處理器來充實資料，並將處理的資料擷取至 OpenSearch 網域。

```
version: "2"
lambda-processor-pipeline:
  source:
    http:
      path: "/${pipelineName}/logs"
  processor:
      - date:
        destination: "@timestamp"
        from_time_received: true
    - aws_lambda:
        function_name: "my-lambda-function"

        tags_on_failure: ["lambda_failure"]
        batch:
            key_name: "events"
        aws:
          region: {{region}}
  sink:
    - opensearch:
        hosts: [ "https://search-{{mydomain}}.{{us-east-1}}es.amazonaws.com" ]
        index: "table-index"
        aws:
          region: "{{region}}"
          serverless: false
```

下列範例 AWS Lambda 函數會將新的鍵/值對 (`"transformed": "true"`) 新增至所提供事件陣列中的每個元素，然後傳回修改後的版本，以轉換傳入的資料。

```
import json

def lambda_handler(event, context):
    input_array = event.get('events', [])
    output = []
    for input in input_array:
        input["transformed"] = "true";
        output.append(input)

    return output
```

## 批次處理
<a name="configure-clients-lambda-batching"></a>

管道會將批次事件傳送至 Lambda 處理器，並動態調整批次大小，以確保其保持在 5 MB 的限制以下。

以下是管道批次的範例：

```
batch:
    key_name: "events"

input_arrary = event.get('events', [])
```

**注意**  
當您建立管道時，請確定 Lambda 處理器組態中的 `key_name`選項符合 Lambda 處理常式中的事件金鑰。

## 條件式篩選
<a name="configure-clients-lambda-conditional-filtering"></a>

條件式篩選可讓您根據事件資料中的特定條件，控制 AWS Lambda 處理器調用 Lambda 函數的時間。當您想要選擇性地處理某些類型的事件，同時忽略其他事件時，此功能特別有用。

下列範例組態使用條件式篩選：

```
processors:
  - aws_lambda:
      function_name: "my-lambda-function"
      aws:
        region: "region"
      lambda_when: "/sourceIp == 10.10.10.10"
```