

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Menggunakan Lambda untuk memproses catatan dari Amazon Kinesis Data Streams
<a name="with-kinesis"></a>

Anda dapat menggunakan fungsi Lambda untuk memproses rekaman dalam aliran data [Amazon Kinesis](https://docs.aws.amazon.com/streams/latest/dev/introduction.html). [Anda dapat memetakan fungsi Lambda ke konsumen throughput bersama Kinesis Data Streams (iterator standar), atau ke konsumen throughput khusus dengan fan-out yang ditingkatkan.](https://docs.aws.amazon.com/kinesis/latest/dev/enhanced-consumers.html) Untuk iterator standar, Lambda melakukan polling rekaman di setiap shard dalam aliran Kinesis Anda dengan menggunakan protokol HTTP. Pemetaan sumber kejadian berbagi throughput baca dengan konsumen lain dari shard tersebut.

 Untuk perincian tentang aliran data Kinesis, lihat [Data Pembacaan dari Amazon Kinesis Data Streams](https://docs.aws.amazon.com/kinesis/latest/dev/building-consumers.html).

**catatan**  
Kinesis mengenakan biaya untuk setiap shard dan, untuk keluaran yang ditingkatkan, pembacaan data dari aliran. Untuk perincian harga, lihat [harga Amazon Kinesis](https://aws.amazon.com/kinesis/data-streams/pricing).

## Polling dan batching stream
<a name="kinesis-polling-and-batching"></a>

Lambda membaca rekaman dari aliran data dan memanggil fungsi Anda [secara sinkron](invocation-sync.md) dengan kejadian yang berisi rekaman aliran. Lambda membaca rekaman dalam batch dan memanggil fungsi Anda untuk memproses rekaman dari batch. Setiap batch berisi catatan dari satu shard/data aliran.

Fungsi Lambda Anda adalah aplikasi konsumen untuk aliran data Anda. Itu memproses satu batch rekaman pada satu waktu dari setiap shard. Anda dapat memetakan fungsi Lambda ke konsumen dengan throughput bersama (iterator standar), atau ke konsumen dengan throughput khusus dengan [keluaran yang ditingkatkan](https://docs.aws.amazon.com/kinesis/latest/dev/enhanced-consumers.html).
+ **Iterator standar:** Lambda polling setiap pecahan dalam aliran Kinesis Anda untuk catatan dengan kecepatan dasar sekali per detik. Saat tersedia lebih banyak rekaman, Lambda melanjutkan pemrosesan batch sampai fungsi dapat menyusul aliran. Pemetaan sumber kejadian berbagi throughput baca dengan konsumen lain dari shard tersebut.
+ **Peningkatan fan-out:** [Untuk meminimalkan latensi dan memaksimalkan throughput baca, buat konsumen aliran data dengan fan-out yang disempurnakan.](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html) Konsumen dengan keluaran yang ditingkatkan mendapatkan koneksi khusus ke setiap shard yang tidak memengaruhi pembacaan aplikasi lain dari aliran tersebut. Konsumen aliran menggunakan HTTP/2 untuk mengurangi latensi dengan mendorong rekaman ke Lambda melalui koneksi yang sudah berlangsung lama dan dengan mengompresi header permintaan. Anda dapat membuat konsumen aliran dengan API Kinesis [RegisterStreamConsumer](https://docs.aws.amazon.com/kinesis/latest/APIReference/API_RegisterStreamConsumer.html).

```
aws kinesis register-stream-consumer \
--consumer-name con1 \
--stream-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream
```

Anda akan melihat output berikut:

```
{
    "Consumer": {
        "ConsumerName": "con1",
        "ConsumerARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream/consumer/con1:1540591608",
        "ConsumerStatus": "CREATING",
        "ConsumerCreationTimestamp": 1540591608.0
    }
}
```

Untuk meningkatkan kecepatan proses fungsi Anda merekam, [tambahkan pecahan ke aliran data Anda](https://repost.aws/knowledge-center/kinesis-data-streams-open-shards). Lambda memproses rekaman dalam setiap shard secara berurutan. Lambda menghentikan pemrosesan rekaman tambahan dalam shard jika fungsi Anda mengembalikan kesalahan. Makin banyak shard, makin banyak batch yang diproses sekaligus sehingga mengurangi dampak kesalahan terhadap konkurensi.

Apabila fungsi Anda tidak dapat menskalakan naik untuk menangani jumlah total batch yang bersamaan, [mintalah kenaikan kuota](https://docs.aws.amazon.com/servicequotas/latest/userguide/request-quota-increase.html) atau [cadangkan konkurensi](configuration-concurrency.md) untuk fungsi Anda.

Secara default, Lambda memanggil fungsi Anda segera setelah catatan tersedia. Jika batch yang dibaca Lambda dari sumber peristiwa hanya memiliki satu catatan di dalamnya, Lambda hanya mengirimkan satu catatan ke fungsi tersebut. *Untuk menghindari menjalankan fungsi dengan sejumlah kecil catatan, Anda dapat memberi tahu sumber acara untuk menyangga catatan hingga 5 menit dengan mengonfigurasi jendela batching.* Sebelum menjalankan fungsi, Lambda terus membaca catatan dari sumber acara hingga mengumpulkan batch penuh, jendela batching kedaluwarsa, atau batch mencapai batas muatan 6 MB. Untuk informasi selengkapnya, lihat [Perilaku batching](invocation-eventsourcemapping.md#invocation-eventsourcemapping-batching).

**Awas**  
Pemetaan sumber peristiwa Lambda memproses setiap peristiwa setidaknya sekali, dan pemrosesan duplikat catatan dapat terjadi. Untuk menghindari potensi masalah yang terkait dengan duplikat peristiwa, kami sangat menyarankan agar Anda membuat kode fungsi Anda idempoten. Untuk mempelajari lebih lanjut, lihat [Bagaimana cara membuat fungsi Lambda saya idempoten](https://repost.aws/knowledge-center/lambda-function-idempotent) di Pusat Pengetahuan. AWS 

Lambda tidak menunggu [ekstensi](lambda-extensions.md) yang dikonfigurasi selesai sebelum mengirim batch berikutnya untuk diproses. Dengan kata lain, ekstensi Anda dapat terus berjalan saat Lambda memproses kumpulan catatan berikutnya. Hal ini dapat menyebabkan masalah pembatasan jika Anda melanggar pengaturan atau [batasan konkurensi](lambda-concurrency.md) akun Anda. Untuk mendeteksi apakah ini merupakan masalah potensial, pantau fungsi Anda dan periksa apakah Anda melihat [metrik konkurensi](monitoring-concurrency.md#general-concurrency-metrics) yang lebih tinggi dari yang diharapkan untuk pemetaan sumber peristiwa Anda. Karena waktu yang singkat di antara pemanggilan, Lambda mungkin secara singkat melaporkan penggunaan konkurensi yang lebih tinggi daripada jumlah pecahan. Ini bisa benar bahkan untuk fungsi Lambda tanpa ekstensi.

Konfigurasikan [ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor)pengaturan untuk memproses satu pecahan aliran data Kinesis dengan lebih dari satu pemanggilan Lambda secara bersamaan. Anda dapat menentukan jumlah batch bersamaan yang polling-nya dibuat Lambda dari shard melalui faktor paralelisasi mulai dari 1 (default) hingga 10. Misalnya, saat Anda menyetel `ParallelizationFactor` ke 2, Anda dapat memiliki maksimum 200 pemanggilan Lambda bersamaan untuk memproses 100 pecahan data Kinesis (meskipun dalam praktiknya, Anda mungkin melihat nilai yang berbeda untuk metrik). `ConcurrentExecutions` Hal ini membantu meningkatkan skala throughput pemrosesan ketika volume data tidak stabil dan `IteratorAge` tinggi. Saat Anda meningkatkan jumlah batch bersamaan per pecahan, Lambda masih memastikan pemrosesan in-order pada tingkat kunci partisi.

Anda juga dapat menggunakan `ParallelizationFactor` dengan agregasi Kinesis. Perilaku pemetaan sumber acara bergantung pada apakah Anda menggunakan [fan-out yang disempurnakan](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html):
+ **Tanpa fan-out yang ditingkatkan**: Semua peristiwa di dalam acara agregat harus memiliki kunci partisi yang sama. Kunci partisi juga harus cocok dengan peristiwa agregat. Jika peristiwa di dalam peristiwa agregat memiliki kunci partisi yang berbeda, Lambda tidak dapat menjamin pemrosesan peristiwa secara berurutan dengan kunci partisi.
+ **Dengan fan-out yang disempurnakan**: Pertama, Lambda menerjemahkan peristiwa agregat ke dalam acara individualnya. Acara agregat dapat memiliki kunci partisi yang berbeda dari peristiwa yang dikandungnya. Namun, peristiwa yang tidak sesuai dengan kunci partisi [dijatuhkan dan hilang](https://github.com/awslabs/kinesis-aggregation/blob/master/potential_data_loss.md). Lambda tidak memproses peristiwa ini, dan tidak mengirimnya ke tujuan kegagalan yang dikonfigurasi.

## Contoh peristiwa
<a name="services-kinesis-event-example"></a>

**Example**  

```
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1545084650.987
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        },
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692540925702759324208523137515618",
                "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=",
                "approximateArrivalTimestamp": 1545084711.166
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
            "awsRegion": "us-east-2",
            "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
        }
    ]
}
```

# Memproses rekaman Amazon Kinesis Data Streams dengan Lambda
<a name="services-kinesis-create"></a>

Untuk memproses rekaman Amazon Kinesis Data Streams dengan Lambda, buat pemetaan sumber peristiwa Lambda. Anda dapat memetakan fungsi Lambda ke iterator standar atau konsumen penggemar yang disempurnakan. Untuk informasi selengkapnya, lihat [Polling dan batching stream](with-kinesis.md#kinesis-polling-and-batching).

## Buat pemetaan sumber acara Kinesis
<a name="services-kinesis-eventsourcemapping"></a>

Untuk menjalankan fungsi Lambda Anda dengan catatan dari aliran data Anda, buat pemetaan sumber [peristiwa](invocation-eventsourcemapping.md). Anda dapat membuat beberapa pemetaan sumber kejadian untuk memproses data yang sama dengan beberapa fungsi Lambda, atau untuk memproses item dari beberapa aliran data dengan satu fungsi. Saat memproses item dari beberapa aliran, setiap batch berisi catatan hanya dari satu pecahan atau aliran.

Anda dapat mengonfigurasi pemetaan sumber peristiwa untuk memproses catatan dari aliran yang berbeda. Akun AWS Untuk mempelajari selengkapnya, lihat [Membuat pemetaan sumber peristiwa lintas akun](#services-kinesis-eventsourcemapping-cross-account).

Sebelum Anda membuat pemetaan sumber peristiwa, Anda perlu memberikan izin fungsi Lambda Anda untuk membaca dari aliran data Kinesis. Lambda memerlukan izin berikut untuk mengelola sumber daya yang terkait dengan aliran data Kinesis Anda:
+ [kinesis: DescribeStream](https://docs.aws.amazon.com/lambda/latest/api/API_DescribeStream.html)
+ [kinesis: DescribeStreamSummary](https://docs.aws.amazon.com/lambda/latest/api/API_DescribeStreamSummary.html)
+ [kinesis: GetRecords](https://docs.aws.amazon.com/lambda/latest/api/API_GetRecords.html)
+ [kinesis: GetShardIterator](https://docs.aws.amazon.com/lambda/latest/api/API_GetShardIterator.html)
+ [kinesis: ListShards](https://docs.aws.amazon.com/lambda/latest/api/API_ListShards.html)
+ [kinesis: SubscribeToShard](https://docs.aws.amazon.com/lambda/latest/api/API_SubscribeToShard.html)

Kebijakan AWS terkelola [AWSLambdaKinesisExecutionRole](https://docs.aws.amazon.com/aws-managed-policy/latest/reference/AWSLambdaKinesisExecutionRole.html)mencakup izin ini. Tambahkan kebijakan terkelola ini ke fungsi Anda seperti yang dijelaskan dalam prosedur berikut.

**catatan**  
Anda tidak memerlukan `kinesis:ListStreams` izin untuk membuat dan mengelola pemetaan sumber acara untuk Kinesis. Namun, jika Anda membuat pemetaan sumber peristiwa di konsol dan Anda tidak memiliki izin ini, Anda tidak akan dapat memilih aliran Kinesis dari daftar tarik-turun dan konsol akan menampilkan kesalahan. Untuk membuat pemetaan sumber peristiwa, Anda harus memasukkan Nama Sumber Daya Amazon (ARN) secara manual dari aliran Anda.
Lambda membuat `kinesis:GetRecords` dan panggilan `kinesis:GetShardIterator` API saat mencoba kembali pemanggilan yang gagal.

------
#### [ Konsol Manajemen AWS ]

**Untuk menambahkan izin Kinesis ke fungsi Anda**

1. Buka [halaman Fungsi](https://console.aws.amazon.com/lambda/home#/functions) konsol Lambda dan pilih fungsi Anda.

1. Di tab **Konfigurasi**, pilih **Izin**.

1. Di panel **Peran eksekusi**, di bawah **Nama peran**, pilih tautan ke peran eksekusi fungsi Anda. Tautan ini membuka halaman untuk peran itu di konsol IAM.

1. **Di panel **Kebijakan izin**, pilih **Tambahkan izin**, lalu pilih Lampirkan kebijakan.**

1. Di bidang pencarian, masukkan**AWSLambdaKinesisExecutionRole**.

1. Pilih kotak centang di samping kebijakan dan pilih **Tambah izin**.

------
#### [ AWS CLI ]

**Untuk menambahkan izin Kinesis ke fungsi Anda**
+ Jalankan perintah CLI berikut untuk menambahkan `AWSLambdaKinesisExecutionRole` kebijakan ke peran eksekusi fungsi Anda:

  ```
  aws iam attach-role-policy \
  --role-name MyFunctionRole \
  --policy-arn arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole
  ```

------
#### [ AWS SAM ]

**Untuk menambahkan izin Kinesis ke fungsi Anda**
+ Dalam definisi fungsi Anda, tambahkan `Policies` properti seperti yang ditunjukkan pada contoh berikut:

  ```
  Resources:
    MyFunction:
      Type: AWS::Serverless::Function
      Properties:
        CodeUri: ./my-function/
        Handler: index.handler
        Runtime: nodejs24.x
        Policies:
          - AWSLambdaKinesisExecutionRole
  ```

------

Setelah mengonfigurasi izin yang diperlukan, buat pemetaan sumber acara.

------
#### [ Konsol Manajemen AWS ]

**Untuk membuat pemetaan sumber peristiwa Kinesis**

1. Buka [halaman Fungsi](https://console.aws.amazon.com/lambda/home#/functions) konsol Lambda dan pilih fungsi Anda.

1. Di panel **Ikhtisar fungsi**, pilih **Tambah pemicu**.

1. Di bawah **Konfigurasi pemicu**, untuk sumbernya, pilih **Kinesis**.

1. Pilih aliran Kinesis yang ingin Anda buat pemetaan sumber acara dan, secara opsional, konsumen aliran Anda.

1. (Opsional) edit **ukuran Batch**, **Posisi awal**, dan **jendela Batch** untuk pemetaan sumber acara Anda.

1. Pilih **Tambahkan**.

Saat membuat pemetaan sumber peristiwa dari konsol, peran IAM Anda harus memiliki izin [kinesis: ListStreams dan [kinesis](https://docs.aws.amazon.com/lambda/latest/api/API_ListStreamConsumers.html):](https://docs.aws.amazon.com/lambda/latest/api/API_ListStreams.html). ListStreamConsumers

------
#### [ AWS CLI ]

**Untuk membuat pemetaan sumber peristiwa Kinesis**
+ Jalankan perintah CLI berikut untuk membuat pemetaan sumber peristiwa Kinesis. Pilih ukuran batch Anda sendiri dan posisi awal sesuai dengan kasus penggunaan Anda.

  ```
  aws lambda create-event-source-mapping \
  --function-name MyFunction \
  --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \
  --starting-position LATEST \
  --batch-size 100
  ```

Untuk menentukan jendela batching, tambahkan `--maximum-batching-window-in-seconds` opsi. Untuk informasi selengkapnya tentang penggunaan parameter ini dan parameter lainnya, lihat [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html)di *AWS CLI Command Reference*.

------
#### [ AWS SAM ]

**Untuk membuat pemetaan sumber peristiwa Kinesis**
+ Dalam definisi fungsi Anda, tambahkan `KinesisEvent` properti seperti yang ditunjukkan pada contoh berikut:

  ```
  Resources:
    MyFunction:
      Type: AWS::Serverless::Function
      Properties:
        CodeUri: ./my-function/
        Handler: index.handler
        Runtime: nodejs24.x
        Policies:
          - AWSLambdaKinesisExecutionRole
        Events:
          KinesisEvent:
            Type: Kinesis
            Properties:
              Stream: !GetAtt MyKinesisStream.Arn
              StartingPosition: LATEST
              BatchSize: 100
  
    MyKinesisStream:
      Type: AWS::Kinesis::Stream
      Properties:
        ShardCount: 1
  ```

*Untuk mempelajari selengkapnya tentang membuat pemetaan sumber peristiwa untuk Kinesis Data Streams di AWS SAM, lihat [Kinesis](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/sam-property-function-kinesis.html) dalam Panduan Pengembang.AWS Serverless Application Model *

------

## Posisi awal polling dan streaming
<a name="services-kinesis-stream-start-pos"></a>

Ketahuilah bahwa polling streaming selama pembuatan dan pembaruan pemetaan sumber acara pada akhirnya konsisten.
+ Selama pembuatan pemetaan sumber acara, mungkin diperlukan beberapa menit untuk memulai acara polling dari aliran.
+ Selama pembaruan pemetaan sumber acara, mungkin diperlukan beberapa menit untuk menghentikan dan memulai kembali acara pemungutan suara dari aliran.

Perilaku ini berarti bahwa jika Anda menentukan `LATEST` sebagai posisi awal untuk aliran, pemetaan sumber peristiwa dapat melewatkan peristiwa selama pembuatan atau pembaruan. Untuk memastikan bahwa tidak ada peristiwa yang terlewatkan, tentukan posisi awal aliran sebagai `TRIM_HORIZON` atau`AT_TIMESTAMP`.

## Membuat pemetaan sumber peristiwa lintas akun
<a name="services-kinesis-eventsourcemapping-cross-account"></a>

Amazon Kinesis Data [Streams](https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies_identity-vs-resource.html) mendukung kebijakan berbasis sumber daya. Karena itu, Anda dapat memproses data yang dicerna ke dalam aliran dalam satu Akun AWS dengan fungsi Lambda di akun lain.

Untuk membuat pemetaan sumber peristiwa untuk fungsi Lambda Anda menggunakan aliran Kinesis yang Akun AWS berbeda, Anda harus mengonfigurasi aliran menggunakan kebijakan berbasis sumber daya untuk memberikan izin fungsi Lambda Anda untuk membaca item. Untuk mempelajari cara mengonfigurasi streaming agar memungkinkan akses lintas akun, lihat [Berbagi akses dengan AWS Lambda fungsi lintas akun](https://docs.aws.amazon.com/streams/latest/dev/resource-based-policy-examples.html#Resource-based-policy-examples-lambda) di panduan Pengembang *Amazon Kinesis Streams*.

Setelah mengonfigurasi aliran Anda dengan kebijakan berbasis sumber daya yang memberi fungsi Lambda Anda izin yang diperlukan, buat pemetaan sumber peristiwa menggunakan salah satu metode yang dijelaskan di bagian sebelumnya.

Jika Anda memilih untuk membuat pemetaan sumber acara menggunakan konsol Lambda, tempelkan ARN aliran Anda langsung ke bidang input. Jika Anda ingin menentukan konsumen untuk streaming Anda, menempelkan ARN konsumen secara otomatis mengisi bidang aliran.

# Mengonfigurasi respons batch sebagian dengan Kinesis Data Streams dan Lambda
<a name="services-kinesis-batchfailurereporting"></a>

Ketika menggunakan dan memproses data streaming dari sumber peristiwa, secara default Lambda memeriksa urutan nomor tertinggi batch hanya ketika batch berhasil diselesaikan. Lambda memperlakukan semua hasil lain sebagai sepenuhnya gagal dan mencoba lagi pemrosesan batch hingga batas coba lagi. Untuk memungkinkan keberhasilan parsial saat memproses batch dari aliran, aktifkan `ReportBatchItemFailures`. Mengizinkan keberhasilan parsial dapat membantu mengurangi jumlah percobaan ulang pada catatan, meskipun tidak sepenuhnya mencegah kemungkinan percobaan ulang dalam catatan yang sukses.

Untuk mengaktifkan `ReportBatchItemFailures`, masukkan nilai enum **ReportBatchItemFailures** dalam daftar [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes). Daftar ini menunjukkan tipe respon yang diaktifkan untuk fungsi Anda. Anda dapat mengonfigurasi daftar ini saat [membuat](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html) atau [memperbarui](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) pemetaan sumber peristiwa.

**catatan**  
Bahkan ketika kode fungsi Anda mengembalikan respons kegagalan sebagian batch, respons ini tidak akan diproses oleh Lambda kecuali `ReportBatchItemFailures` fitur tersebut diaktifkan secara eksplisit untuk pemetaan sumber acara Anda.

## Sintaks laporan
<a name="streams-batchfailurereporting-syntax"></a>

Saat mengonfigurasi pelaporan pada kegagalan item batch, kelas `StreamsEventResponse` dikembalikan dengan daftar kegagalan item batch. Anda dapat menggunakan objek `StreamsEventResponse` untuk mengembalikan nomor urutan catatan gagal pertama dalam batch. Anda juga dapat membuat kelas kustom Anda sendiri menggunakan sintaks respons yang benar. Struktur JSON berikut menunjukkan sintaks respons yang diperlukan:

```
{ 
  "batchItemFailures": [ 
        {
            "itemIdentifier": "<SequenceNumber>"
        }
    ]
}
```

**catatan**  
Jika `batchItemFailures` array berisi beberapa item, Lambda menggunakan catatan dengan nomor urut terendah sebagai pos pemeriksaan. Lambda kemudian mencoba kembali semua catatan mulai dari pos pemeriksaan itu.

## Status berhasil dan gagal
<a name="streams-batchfailurereporting-conditions"></a>

Lambda memperlakukan batch sebagai sepenuhnya berhasil jika Anda mengembalikan salah satu dari berikut:
+ Daftar `batchItemFailure` kosong
+ Daftar `batchItemFailure` nol
+ `EventResponse` kosong
+ `EventResponse` nol

Lambda memperlakukan batch sebagai sepenuhnya gagal jika Anda mengembalikan salah satu dari berikut:
+ String `itemIdentifier` kosong
+ `itemIdentifier` nol
+ `itemIdentifier` dengan nama kunci yang buruk

Lambda mencoba kembali kegagalan berdasarkan strategi coba lagi Anda.

## Membagi batch
<a name="streams-batchfailurereporting-bisect"></a>

Jika invokasi Anda gagal dan `BisectBatchOnFunctionError` diaktifkan, batch dibagi terlepas dari pengaturan `ReportBatchItemFailures` Anda.

Ketika respons berhasil batch parsial diterima dan kedua `BisectBatchOnFunctionError` dan `ReportBatchItemFailures` diaktifkan, batch dibagi dua di nomor urutan yang dikembalikan dan Lambda mencoba hanya catatan yang tersisa.

Untuk menyederhanakan implementasi logika respons batch sebagian, pertimbangkan untuk menggunakan [utilitas Prosesor Batch](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) dari Powertools untuk AWS Lambda, yang secara otomatis menangani kompleksitas ini untuk Anda.

Berikut adalah beberapa contoh kode fungsi yang mengembalikan daftar pesan gagal IDs dalam batch:

------
#### [ .NET ]

**SDK untuk .NET**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Melaporkan kegagalan item batch Kinesis dengan Lambda menggunakan.NET.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
﻿using System.Text;
using System.Text.Json.Serialization;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace KinesisIntegration;

public class Function
{
    // Powertools Logger requires an environment variables against your function
    // POWERTOOLS_SERVICE_NAME
    [Logging(LogEvent = true)]
    public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context)
    {
        if (evnt.Records.Count == 0)
        {
            Logger.LogInformation("Empty Kinesis Event received");
            return new StreamsEventResponse();
        }

        foreach (var record in evnt.Records)
        {
            try
            {
                Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
                string data = await GetRecordDataAsync(record.Kinesis, context);
                Logger.LogInformation($"Data: {data}");
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex)
            {
                Logger.LogError($"An error occurred {ex.Message}");
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                return new StreamsEventResponse
                {
                    BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure>
                    {
                        new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber }
                    }
                };
            }
        }
        Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
        return new StreamsEventResponse();
    }

    private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
    {
        byte[] bytes = record.Data.ToArray();
        string data = Encoding.UTF8.GetString(bytes);
        await Task.CompletedTask; //Placeholder for actual async work
        return data;
    }
}

public class StreamsEventResponse
{
    [JsonPropertyName("batchItemFailures")]
    public IList<BatchItemFailure> BatchItemFailures { get; set; }
    public class BatchItemFailure
    {
        [JsonPropertyName("itemIdentifier")]
        public string ItemIdentifier { get; set; }
    }
}
```

------
#### [ Go ]

**SDK untuk Go V2**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Melaporkan kegagalan item batch Kinesis dengan Lambda menggunakan Go.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"fmt"
	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(ctx context.Context, kinesisEvent events.KinesisEvent) (map[string]interface{}, error) {
	batchItemFailures := []map[string]interface{}{}

	for _, record := range kinesisEvent.Records {
		curRecordSequenceNumber := ""

		// Process your record
		if /* Your record processing condition here */ {
			curRecordSequenceNumber = record.Kinesis.SequenceNumber
		}

		// Add a condition to check if the record processing failed
		if curRecordSequenceNumber != "" {
			batchItemFailures = append(batchItemFailures, map[string]interface{}{"itemIdentifier": curRecordSequenceNumber})
		}
	}

	kinesisBatchResponse := map[string]interface{}{
		"batchItemFailures": batchItemFailures,
	}
	return kinesisBatchResponse, nil
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK untuk Java 2.x**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Melaporkan kegagalan item batch Kinesis dengan Lambda menggunakan Java.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;

public class ProcessKinesisRecords implements RequestHandler<KinesisEvent, StreamsEventResponse> {

    @Override
    public StreamsEventResponse handleRequest(KinesisEvent input, Context context) {

        List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
        String curRecordSequenceNumber = "";

        for (KinesisEvent.KinesisEventRecord kinesisEventRecord : input.getRecords()) {
            try {
                //Process your record
                KinesisEvent.Record kinesisRecord = kinesisEventRecord.getKinesis();
                curRecordSequenceNumber = kinesisRecord.getSequenceNumber();

            } catch (Exception e) {
                /* Since we are working with streams, we can return the failed item immediately.
                   Lambda will immediately begin to retry processing from this failed item onwards. */
                batchItemFailures.add(new StreamsEventResponse.BatchItemFailure(curRecordSequenceNumber));
                return new StreamsEventResponse(batchItemFailures);
            }
        }
       
       return new StreamsEventResponse(batchItemFailures);   
    }
}
```

------
#### [ JavaScript ]

**SDK untuk JavaScript (v3)**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/blob/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Melaporkan kegagalan item batch Kinesis dengan Lambda menggunakan Javascript.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
  for (const record of event.Records) {
    try {
      console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      console.log(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      console.error(`An error occurred ${err}`);
      /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
      return {
        batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }],
      };
    }
  }
  console.log(`Successfully processed ${event.Records.length} records.`);
  return { batchItemFailures: [] };
};

async function getRecordDataAsync(payload) {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```
Melaporkan kegagalan item batch Kinesis dengan penggunaan Lambda. TypeScript  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
  KinesisStreamEvent,
  Context,
  KinesisStreamHandler,
  KinesisStreamRecordPayload,
  KinesisStreamBatchResponse,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "kinesis-stream-handler-sample",
});

export const functionHandler: KinesisStreamHandler = async (
  event: KinesisStreamEvent,
  context: Context
): Promise<KinesisStreamBatchResponse> => {
  for (const record of event.Records) {
    try {
      logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      logger.info(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      logger.error(`An error occurred ${err}`);
      /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
      return {
        batchItemFailures: [{ itemIdentifier: record.kinesis.sequenceNumber }],
      };
    }
  }
  logger.info(`Successfully processed ${event.Records.length} records.`);
  return { batchItemFailures: [] };
};

async function getRecordDataAsync(
  payload: KinesisStreamRecordPayload
): Promise<string> {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```

------
#### [ PHP ]

**SDK untuk PHP**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Melaporkan kegagalan item batch Kinesis dengan Lambda menggunakan PHP.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Handler as StdHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler implements StdHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handle(mixed $event, Context $context): array
    {
        $kinesisEvent = new KinesisEvent($event);
        $this->logger->info("Processing records");
        $records = $kinesisEvent->getRecords();

        $failedRecords = [];
        foreach ($records as $record) {
            try {
                $data = $record->getData();
                $this->logger->info(json_encode($data));
                // TODO: Do interesting work based on the new data
            } catch (Exception $e) {
                $this->logger->error($e->getMessage());
                // failed processing the record
                $failedRecords[] = $record->getSequenceNumber();
            }
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");

        // change format for the response
        $failures = array_map(
            fn(string $sequenceNumber) => ['itemIdentifier' => $sequenceNumber],
            $failedRecords
        );

        return [
            'batchItemFailures' => $failures
        ];
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK untuk Python (Boto3)**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Melaporkan kegagalan item batch Kinesis dengan Lambda menggunakan Python.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
def handler(event, context):
    records = event.get("Records")
    curRecordSequenceNumber = ""
    
    for record in records:
        try:
            # Process your record
            curRecordSequenceNumber = record["kinesis"]["sequenceNumber"]
        except Exception as e:
            # Return failed record's sequence number
            return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]}

    return {"batchItemFailures":[]}
```

------
#### [ Ruby ]

**SDK untuk Ruby**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Melaporkan kegagalan item batch Kinesis dengan Lambda menggunakan Ruby.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'

def lambda_handler(event:, context:)
  batch_item_failures = []

  event['Records'].each do |record|
    begin
      puts "Processed Kinesis Event - EventID: #{record['eventID']}"
      record_data = get_record_data_async(record['kinesis'])
      puts "Record Data: #{record_data}"
      # TODO: Do interesting work based on the new data
    rescue StandardError => err
      puts "An error occurred #{err}"
      # Since we are working with streams, we can return the failed item immediately.
      # Lambda will immediately begin to retry processing from this failed item onwards.
      return { batchItemFailures: [{ itemIdentifier: record['kinesis']['sequenceNumber'] }] }
    end
  end

  puts "Successfully processed #{event['Records'].length} records."
  { batchItemFailures: batch_item_failures }
end

def get_record_data_async(payload)
  data = Base64.decode64(payload['data']).force_encoding('utf-8')
  # Placeholder for actual async work
  sleep(1)
  data
end
```

------
#### [ Rust ]

**SDK for Rust**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda-with-batch-item-handling). 
Melaporkan kegagalan item batch Kinesis dengan Lambda menggunakan Rust.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::{
    event::kinesis::KinesisEvent,
    kinesis::KinesisEventRecord,
    streams::{KinesisBatchItemFailure, KinesisEventResponse},
};
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<KinesisEventResponse, Error> {
    let mut response = KinesisEventResponse {
        batch_item_failures: vec![],
    };

    if event.payload.records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(response);
    }

    for record in &event.payload.records {
        tracing::info!(
            "EventId: {}",
            record.event_id.as_deref().unwrap_or_default()
        );

        let record_processing_result = process_record(record);

        if record_processing_result.is_err() {
            response.batch_item_failures.push(KinesisBatchItemFailure {
                item_identifier: record.kinesis.sequence_number.clone(),
            });
            /* Since we are working with streams, we can return the failed item immediately.
            Lambda will immediately begin to retry processing from this failed item onwards. */
            return Ok(response);
        }
    }

    tracing::info!(
        "Successfully processed {} records",
        event.payload.records.len()
    );

    Ok(response)
}

fn process_record(record: &KinesisEventRecord) -> Result<(), Error> {
    let record_data = std::str::from_utf8(record.kinesis.data.as_slice());

    if let Some(err) = record_data.err() {
        tracing::error!("Error: {}", err);
        return Err(Error::from(err));
    }

    let record_data = record_data.unwrap_or_default();

    // do something interesting with the data
    tracing::info!("Data: {}", record_data);

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

## Menggunakan Powertools untuk prosesor AWS Lambda batch
<a name="services-kinesis-batchfailurereporting-powertools"></a>

Utilitas prosesor batch dari Powertools untuk AWS Lambda secara otomatis menangani logika respons batch paral, mengurangi kompleksitas penerapan pelaporan kegagalan batch. Berikut adalah contoh menggunakan prosesor batch:

**Python**  
Untuk contoh lengkap dan petunjuk penyiapan, lihat [dokumentasi prosesor batch](https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/).
Memproses Kinesis Data Streams stream AWS Lambda record dengan batch processor.  

```
import json
from aws_lambda_powertools import Logger
from aws_lambda_powertools.utilities.batch import BatchProcessor, EventType, process_partial_response
from aws_lambda_powertools.utilities.data_classes import KinesisEvent
from aws_lambda_powertools.utilities.typing import LambdaContext

processor = BatchProcessor(event_type=EventType.KinesisDataStreams)
logger = Logger()

def record_handler(record):
    logger.info(record)
    # Your business logic here
    # Raise an exception to mark this record as failed
    
def lambda_handler(event, context: LambdaContext):
    return process_partial_response(
        event=event, 
        record_handler=record_handler, 
        processor=processor,
        context=context
    )
```

**TypeScript**  
Untuk contoh lengkap dan petunjuk penyiapan, lihat [dokumentasi prosesor batch](https://docs.aws.amazon.com/powertools/typescript/latest/features/batch/).
Memproses Kinesis Data Streams stream AWS Lambda record dengan batch processor.  

```
import { BatchProcessor, EventType, processPartialResponse } from '@aws-lambda-powertools/batch';
import { Logger } from '@aws-lambda-powertools/logger';
import type { KinesisEvent, Context } from 'aws-lambda';

const processor = new BatchProcessor(EventType.KinesisDataStreams);
const logger = new Logger();

const recordHandler = async (record: any): Promise<void> => {
    logger.info('Processing record', { record });
    // Your business logic here
    // Throw an error to mark this record as failed
};

export const handler = async (event: KinesisEvent, context: Context) => {
    return processPartialResponse(event, recordHandler, processor, {
        context,
    });
};
```

**Java**  
Untuk contoh lengkap dan petunjuk penyiapan, lihat [dokumentasi prosesor batch](https://docs.powertools.aws.dev/lambda/java/latest/utilities/batch/).
Memproses Kinesis Data Streams stream AWS Lambda record dengan batch processor.  

```
import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;

public class KinesisStreamBatchHandler implements RequestHandler<KinesisEvent, StreamsEventResponse> {

    private final BatchMessageHandler<KinesisEvent, StreamsEventResponse> handler;

    public KinesisStreamBatchHandler() {
        handler = new BatchMessageHandlerBuilder()
                .withKinesisBatchHandler()
                .buildWithRawMessageHandler(this::processMessage);
    }

    @Override
    public StreamsEventResponse handleRequest(KinesisEvent kinesisEvent, Context context) {
        return handler.processBatch(kinesisEvent, context);
    }

    private void processMessage(KinesisEvent.KinesisEventRecord kinesisEventRecord, Context context) {
        // Process the stream record
    }
}
```

**.NET**  
Untuk contoh lengkap dan petunjuk penyiapan, lihat [dokumentasi prosesor batch](https://docs.aws.amazon.com/powertools/dotnet/utilities/batch-processing/).
Memproses Kinesis Data Streams stream AWS Lambda record dengan batch processor.  

```
using System;
using System.Threading;
using System.Threading.Tasks;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using Amazon.Lambda.Serialization.SystemTextJson;
using AWS.Lambda.Powertools.BatchProcessing;

[assembly: LambdaSerializer(typeof(DefaultLambdaJsonSerializer))]

namespace HelloWorld;

public class OrderEvent
{
    public string? OrderId { get; set; }
    public string? CustomerId { get; set; }
    public decimal Amount { get; set; }
    public DateTime OrderDate { get; set; }
}

internal class TypedKinesisRecordHandler : ITypedRecordHandler<OrderEvent> 
{
    public async Task<RecordHandlerResult> HandleAsync(OrderEvent orderEvent, CancellationToken cancellationToken)
    {
        if (string.IsNullOrEmpty(orderEvent.OrderId)) 
        {
            throw new ArgumentException("Order ID is required");
        }

        return await Task.FromResult(RecordHandlerResult.None); 
    }
}

public class Function
{
    [BatchProcessor(TypedRecordHandler = typeof(TypedKinesisRecordHandler))]
    public BatchItemFailuresResponse HandlerUsingTypedAttribute(KinesisEvent _)
    {
        return TypedKinesisStreamBatchProcessor.Result.BatchItemFailuresResponse; 
    }
}
```

# Menyimpan catatan batch yang dibuang untuk sumber peristiwa Kinesis Data Streams di Lambda
<a name="kinesis-on-failure-destination"></a>

Penanganan kesalahan untuk pemetaan sumber peristiwa Kinesis bergantung pada apakah kesalahan terjadi sebelum fungsi dipanggil atau selama pemanggilan fungsi:
+ **Sebelum pemanggilan:** Jika pemetaan sumber peristiwa Lambda tidak dapat menjalankan fungsi karena pelambatan atau masalah lain, ia akan mencoba lagi hingga catatan kedaluwarsa atau melebihi usia maksimum yang dikonfigurasi pada pemetaan sumber peristiwa (). [MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)
+ **Selama pemanggilan:** Jika fungsi dipanggil tetapi menampilkan kesalahan, Lambda mencoba ulang hingga catatan kedaluwarsa, melebihi usia maksimum ([MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)), atau mencapai kuota coba ulang yang dikonfigurasi (). [MaximumRetryAttempts](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRetryAttempts) Untuk kesalahan fungsi, Anda juga dapat mengonfigurasi [BisectBatchOnFunctionError](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-response-BisectBatchOnFunctionError), yang membagi batch yang gagal menjadi dua batch yang lebih kecil, mengisolasi catatan buruk dan menghindari batas waktu. Memisahkan batch tidak menghabiskan kuota coba lagi.

Jika tindakan penanganan kesalahan gagal, Lambda membuang rekaman dan melanjutkan pemrosesan batch dari aliran. Dengan pengaturan default, ini berarti rekaman yang buruk dapat memblokir pemrosesan pada shard yang terpengaruh selama hingga satu minggu. Untuk menghindari hal ini, konfigurasikan pemetaan sumber kejadian fungsi Anda dengan jumlah percobaan ulang yang wajar dan usia maksimum rekaman yang sesuai dengan kasus penggunaan Anda.

## Mengonfigurasi tujuan untuk pemanggilan yang gagal
<a name="kinesis-on-failure-destination-console"></a>

Untuk menyimpan catatan pemanggilan pemetaan sumber peristiwa yang gagal, tambahkan tujuan ke pemetaan sumber peristiwa fungsi Anda. Setiap catatan yang dikirim ke tujuan adalah dokumen JSON yang berisi metadata tentang pemanggilan yang gagal. Untuk tujuan Amazon S3, Lambda juga mengirimkan seluruh catatan pemanggilan bersama dengan metadata. Anda dapat mengonfigurasi topik Amazon SNS, antrian Amazon SQS, bucket Amazon S3, atau Kafka sebagai tujuan.

Dengan tujuan Amazon S3, Anda dapat menggunakan fitur Pemberitahuan [Acara Amazon S3](https://docs.aws.amazon.com/) untuk menerima notifikasi saat objek diunggah ke bucket S3 tujuan Anda. Anda juga dapat mengonfigurasi Pemberitahuan Acara S3 untuk menjalankan fungsi Lambda lain untuk melakukan pemrosesan otomatis pada batch yang gagal.

Peran eksekusi Anda harus memiliki izin untuk tujuan:
+ **Untuk tujuan SQS: [sqs](https://docs.aws.amazon.com/AWSSimpleQueueService/latest/APIReference/API_SendMessage.html):** SendMessage
+ **[Untuk tujuan SNS: SNS: Publish](https://docs.aws.amazon.com/sns/latest/api/API_Publish.html)**
+ **Untuk tujuan S3: s3: PutObject** [dan [s3:](https://docs.aws.amazon.com/AmazonS3/latest/API/API_PutObject.html) ListBucket](https://docs.aws.amazon.com/AmazonS3/latest/API/ListObjectsV2.html)
+ **Untuk tujuan Kafka: [kafka-cluster](https://docs.aws.amazon.com/msk/latest/developerguide/kafka-actions.html):** WriteData

Anda dapat mengonfigurasi topik Kafka sebagai tujuan gagal untuk pemetaan sumber acara Kafka Anda. Ketika Lambda tidak dapat memproses catatan setelah percobaan ulang yang melelahkan atau ketika catatan melebihi usia maksimum, Lambda mengirimkan catatan yang gagal ke topik Kafka yang ditentukan untuk diproses nanti. Lihat [Menggunakan topik Kafka sebagai tujuan kegagalan](kafka-on-failure-destination.md).

Jika Anda telah mengaktifkan enkripsi dengan kunci KMS Anda sendiri untuk tujuan S3, peran eksekusi fungsi Anda juga harus memiliki izin untuk memanggil [kms](https://docs.aws.amazon.com/kms/latest/APIReference/API_GenerateDataKey.html):. GenerateDataKey Jika kunci KMS dan tujuan bucket S3 berada di akun yang berbeda dari fungsi Lambda dan peran eksekusi, konfigurasikan kunci KMS untuk mempercayai peran eksekusi yang diizinkan. kms: GenerateDataKey

Untuk mengonfigurasi tujuan saat gagal menggunakan konsol, ikuti langkah-langkah berikut:

1. Buka [halaman Fungsi](https://console.aws.amazon.com/lambda/home#/functions) di konsol Lambda.

1. Pilih fungsi.

1. Di bagian **Gambaran umum fungsi**, pilih **Tambahkan tujuan**.

1. Untuk **Sumber**, pilih **Pemanggilan pemetaan sumber acara**.

1. Untuk **pemetaan sumber peristiwa**, pilih sumber peristiwa yang dikonfigurasi untuk fungsi ini.

1. Untuk **Kondisi**, pilih **On failure**. Untuk pemanggilan pemetaan sumber peristiwa, ini adalah satu-satunya kondisi yang diterima.

1. Untuk **tipe Tujuan**, pilih tipe tujuan yang Lambda kirimkan catatan pemanggilan.

1. Untuk **Tujuan**, pilih sumber daya.

1. Pilih **Simpan**.

Anda juga dapat mengonfigurasi tujuan pada kegagalan menggunakan AWS Command Line Interface (AWS CLI). Misalnya, [create-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/create-event-source-mapping.html)perintah berikut menambahkan pemetaan sumber peristiwa dengan tujuan kegagalan SQS ke: `MyFunction`

```
aws lambda create-event-source-mapping \
--function-name "MyFunction" \
--event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
```

[update-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/update-event-source-mapping.html)Perintah berikut memperbarui pemetaan sumber peristiwa untuk mengirim catatan pemanggilan yang gagal ke tujuan SNS setelah dua kali mencoba lagi, atau jika catatan berusia lebih dari satu jam.

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--maximum-retry-attempts 2 \
--maximum-record-age-in-seconds 3600 \
--destination-config '{"OnFailure": {"Destination": "arn:aws:sns:us-east-1:123456789012:dest-topic"}}'
```

Pengaturan yang diperbarui diterapkan secara asinkron dan tidak muncul dalam output hingga proses selesai. Gunakan perintah [get-event-source-mapping](https://awscli.amazonaws.com/v2/documentation/api/latest/reference/lambda/get-event-source-mapping.html) untuk melihat status saat ini.

Untuk menghapus tujuan, berikan string kosong sebagai argumen ke `destination-config` parameter:

```
aws lambda update-event-source-mapping \
--uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \
--destination-config '{"OnFailure": {"Destination": ""}}'
```

### Praktik terbaik keamanan untuk tujuan Amazon S3
<a name="kinesis-s3-destination-security"></a>

Menghapus bucket S3 yang dikonfigurasi sebagai tujuan tanpa menghapus tujuan dari konfigurasi fungsi Anda dapat menimbulkan risiko keamanan. Jika pengguna lain mengetahui nama bucket tujuan Anda, mereka dapat membuat ulang bucket di bucket tersebut. Akun AWS Catatan pemanggilan yang gagal akan dikirim ke bucket mereka, yang berpotensi mengekspos data dari fungsi Anda.

**Awas**  
Untuk memastikan bahwa catatan pemanggilan dari fungsi Anda tidak dapat dikirim ke bucket S3 di bucket lain Akun AWS, tambahkan kondisi ke peran eksekusi fungsi Anda yang membatasi `s3:PutObject` izin ke bucket di akun Anda. 

Contoh berikut menunjukkan kebijakan IAM yang membatasi `s3:PutObject` izin fungsi Anda ke bucket di akun Anda. Kebijakan ini juga memberi Lambda `s3:ListBucket` izin yang dibutuhkan untuk menggunakan bucket S3 sebagai tujuan.

```
{
    "Version": "2012-10-17",		 	 	 
    "Statement": [
        {
            "Sid": "S3BucketResourceAccountWrite",
            "Effect": "Allow",
            "Action": [
                "s3:PutObject",
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::*/*",
                "arn:aws:s3:::*"
            ],
            "Condition": {
                "StringEquals": {
                    "s3:ResourceAccount": "111122223333"
                }
            }
        }
    ]
}
```

Untuk menambahkan kebijakan izin ke peran eksekusi fungsi Anda menggunakan Konsol Manajemen AWS atau AWS CLI, lihat instruksi dalam prosedur berikut:

------
#### [ Console ]

**Untuk menambahkan kebijakan izin ke peran eksekusi fungsi (konsol)**

1. Buka [halaman Fungsi](https://console.aws.amazon.com/lambda/home#/functions) di konsol Lambda.

1. Pilih fungsi Lambda yang peran eksekusinya ingin Anda modifikasi.

1. Di tab **Konfigurasi**, pilih **Izin**.

1. Di tab **Peran eksekusi**, pilih **nama Peran** fungsi Anda untuk membuka halaman konsol IAM peran.

1. Tambahkan kebijakan izin ke peran dengan melakukan hal berikut:

   1. Di panel **Kebijakan izin**, pilih **Tambahkan izin**, lalu pilih **Buat** kebijakan sebaris.

   1. Di **Editor kebijakan**, pilih **JSON**.

   1. Rekatkan kebijakan yang ingin Anda tambahkan ke editor (ganti JSON yang ada), lalu pilih **Berikutnya**.

   1. Di bawah **Detail kebijakan**, masukkan **nama Kebijakan**.

   1. Pilih **Buat kebijakan**.

------
#### [ AWS CLI ]

**Untuk menambahkan kebijakan izin ke peran eksekusi fungsi (CLI)**

1. Buat dokumen kebijakan JSON dengan izin yang diperlukan dan simpan di direktori lokal.

1. Gunakan perintah IAM `put-role-policy` CLI untuk menambahkan izin ke peran eksekusi fungsi Anda. Jalankan perintah berikut dari direktori tempat Anda menyimpan dokumen kebijakan JSON dan ganti nama peran, nama kebijakan, dan dokumen kebijakan dengan nilai Anda sendiri.

   ```
   aws iam put-role-policy \
   --role-name my_lambda_role \
   --policy-name LambdaS3DestinationPolicy \
   --policy-document file://my_policy.json
   ```

------

### Contoh catatan pemanggilan Amazon SNS dan Amazon SQS
<a name="kinesis-on-failure-destination-example-sns-sqs"></a>

Contoh berikut menunjukkan apa yang Lambda kirim ke antrian SQS atau topik SNS untuk pemanggilan sumber peristiwa Kinesis yang gagal. Karena Lambda hanya mengirimkan metadata untuk jenis tujuan ini, gunakan,, `streamArn` `shardId``startSequenceNumber`, dan `endSequenceNumber` bidang untuk mendapatkan catatan asli lengkap. Semua bidang yang ditampilkan di `KinesisBatchInfo` properti akan selalu ada.

```
{
    "requestContext": {
        "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:38:06.021Z",
    "KinesisBatchInfo": {
        "shardId": "shardId-000000000001",
        "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722",
        "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z",
        "batchSize": 500,
        "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream"
    }
}
```

Anda dapat menggunakan informasi ini guna mengambil rekaman yang terpengaruh dari aliran untuk pemecahan masalah. Rekaman aktual tidak disertakan, jadi Anda harus memproses rekaman ini dan mengambilnya dari aliran sebelum kedaluwarsa dan hilang.

### Contoh catatan pemanggilan Amazon S3
<a name="kinesis-on-failure-destination-example-sns-sqs-s3"></a>

Contoh berikut menunjukkan apa yang dikirim Lambda ke bucket Amazon S3 untuk pemanggilan sumber peristiwa Kinesis yang gagal. Selain semua bidang dari contoh sebelumnya untuk tujuan SQS dan SNS, `payload` bidang berisi catatan pemanggilan asli sebagai string JSON yang lolos.

```
{
    "requestContext": {
        "requestId": "c9b8fa9f-5a7f-xmpl-af9c-0c604cde93a5",
        "functionArn": "arn:aws:lambda:us-east-2:123456789012:function:myfunction",
        "condition": "RetryAttemptsExhausted",
        "approximateInvokeCount": 1
    },
    "responseContext": {
        "statusCode": 200,
        "executedVersion": "$LATEST",
        "functionError": "Unhandled"
    },
    "version": "1.0",
    "timestamp": "2019-11-14T00:38:06.021Z",
    "KinesisBatchInfo": {
        "shardId": "shardId-000000000001",
        "startSequenceNumber": "49601189658422359378836298521827638475320189012309704722",
        "endSequenceNumber": "49601189658422359378836298522902373528957594348623495186",
        "approximateArrivalOfFirstRecord": "2019-11-14T00:38:04.835Z",
        "approximateArrivalOfLastRecord": "2019-11-14T00:38:05.580Z",
        "batchSize": 500,
        "streamArn": "arn:aws:kinesis:us-east-2:123456789012:stream/mystream"
    },
    "payload": "<Whole Event>" // Only available in S3
}
```

Objek S3 yang berisi catatan pemanggilan menggunakan konvensi penamaan berikut:

```
aws/lambda/<ESM-UUID>/<shardID>/YYYY/MM/DD/YYYY-MM-DDTHH.MM.SS-<Random UUID>
```

# Menerapkan pemrosesan Kinesis Data Streams stateful di Lambda
<a name="services-kinesis-windows"></a>

Fungsi Lambda dapat menjalankan aplikasi pemrosesan aliran berkelanjutan. Aliran merupakan data tidak terbatas yang mengalir terus-menerus melalui aplikasi Anda. Untuk menganalisis informasi dari input yang terus diperbarui ini, Anda dapat mengikat catatan yang disertakan menggunakan jendela yang didefinisikan dalam hal waktu.

Jatuh jendela adalah jendela waktu yang berbeda yang membuka dan menutup secara berkala. Secara default, pemanggilan Lambda tidak memiliki status — Anda tidak dapat menggunakannya untuk memproses data di beberapa pemanggilan berkelanjutan tanpa database eksternal. Namun, dengan jendela yang jatuh, Anda dapat mempertahankan status Anda di seluruh pemanggilan. Status ini berisi hasil agregat pesan yang sebelumnya diproses untuk jendela saat ini. Status Anda maksimal bisa sebesar 1 MB per shard. Jika melebihi ukuran tersebut, Lambda mengakhiri jendela lebih awal.

Setiap rekaman dalam aliran milik jendela tertentu. Lambda akan memproses setiap rekaman setidaknya sekali, tetapi tidak menjamin bahwa setiap rekaman akan diproses hanya sekali. Dalam kasus yang jarang terjadi, seperti penanganan kesalahan, beberapa catatan mungkin diproses lebih dari sekali. Catatan selalu diproses secara berurutan pertama kali. Jika catatan diproses lebih dari satu kali, mereka mungkin diproses rusak.

## Agregasi dan pemrosesan
<a name="streams-tumbling-processing"></a>

Fungsi yang dikelola pengguna Anda dipanggil baik untuk agregasi maupun untuk memproses hasil akhir agregasi tersebut. Lambda mengumpulkan semua catatan yang diterima di jendela. Anda dapat menerima catatan ini dalam beberapa batch, masing-masing sebagai invokasi terpisah. Setiap invokasi menerima status. Jadi, saat menggunakan jendela tumbling, respons fungsi Lambda Anda harus berisi `state` properti. Jika respons tidak berisi `state` properti, Lambda menganggap ini sebagai pemanggilan yang gagal. Untuk memenuhi kondisi ini, fungsi Anda dapat mengembalikan `TimeWindowEventResponse` objek, yang memiliki bentuk JSON berikut:

**Example Nilai `TimeWindowEventResponse`**  

```
{
    "state": {
        "1": 282,
        "2": 715
    },
    "batchItemFailures": []
}
```

**catatan**  
Untuk fungsi Java, sebaiknya gunakan a `Map<String, String>` untuk mewakili status.

Di akhir jendela, tanda `isFinalInvokeForWindow` diatur ke `true` untuk menunjukkan bahwa ini adalah status akhir dan bahwa itu siap untuk diproses. Setelah pemrosesan, jendela selesai, dan invokasi akhir Anda selesai, lalu status dihapus.

Di akhir jendela Anda, Lambda menggunakan pemrosesan akhir untuk tindakan pada hasil agregasi. Pemrosesan akhir Anda dipanggil secara sinkron. Setelah pemanggilan berhasil, fungsi Anda memeriksa nomor urut dan pemrosesan aliran berlanjut. Jika invokasi tidak berhasil, fungsi Lambda Anda menunda pemrosesan lebih lanjut sampai invokasi sukses.

**Example KinesisTimeWindowEvent**  

```
{
    "Records": [
        {
            "kinesis": {
                "kinesisSchemaVersion": "1.0",
                "partitionKey": "1",
                "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                "approximateArrivalTimestamp": 1607497475.000
            },
            "eventSource": "aws:kinesis",
            "eventVersion": "1.0",
            "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
            "eventName": "aws:kinesis:record",
            "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-kinesis-role",
            "awsRegion": "us-east-1",
            "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream"
        }
    ],
    "window": {
        "start": "2020-12-09T07:04:00Z",
        "end": "2020-12-09T07:06:00Z"
    },
    "state": {
        "1": 282,
        "2": 715
    },
    "shardId": "shardId-000000000006",
    "eventSourceARN": "arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream",
    "isFinalInvokeForWindow": false,
    "isWindowTerminatedEarly": false
}
```

## Konfigurasi
<a name="streams-tumbling-config"></a>

Anda dapat mengonfigurasi jendela berguling saat membuat atau memperbarui pemetaan sumber peristiwa. Untuk mengkonfigurasi jendela tumbling, tentukan jendela dalam hitungan detik ([TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds)). Contoh berikut AWS Command Line Interface (AWS CLI) perintah membuat pemetaan sumber acara streaming yang memiliki jendela jatuh 120 detik. Fungsi Lambda yang didefinisikan untuk agregasi dan pemrosesan diberi nama `tumbling-window-example-function`.

```
aws lambda create-event-source-mapping \
--event-source-arn arn:aws:kinesis:us-east-1:123456789012:stream/lambda-stream \
--function-name tumbling-window-example-function \
--starting-position TRIM_HORIZON \
--tumbling-window-in-seconds 120
```

Lambda menentukan jatuh batas jendela berguling berdasarkan waktu ketika catatan dimasukkan ke dalam aliran. Semua catatan memiliki stempel waktu perkiraan yang tersedia yang digunakan Lambda dalam penentuan batas.

Agregasi jendela berguling tidak mendukung shard ulang. Ketika pecahan berakhir, Lambda menganggap jendela saat ini ditutup, dan setiap pecahan anak akan memulai jendela mereka sendiri dalam keadaan baru. Ketika tidak ada catatan baru yang ditambahkan ke jendela saat ini, Lambda menunggu hingga 2 menit sebelum mengasumsikan bahwa jendela sudah selesai. Ini membantu memastikan bahwa fungsi membaca semua catatan di jendela saat ini, bahkan jika catatan ditambahkan sebentar-sebentar.

Jendela berguling sepenuhnya mendukung kebijakan coba lagi yang ada `maxRetryAttempts` dan `maxRecordAge`.

**Example Handler.py - Agregasi dan pemrosesan**  
Fungsi Python berikut menunjukkan cara untuk menggabungkan, lalu memproses status akhir Anda:  

```
def lambda_handler(event, context):
    print('Incoming event: ', event)
    print('Incoming state: ', event['state'])

#Check if this is the end of the window to either aggregate or process.
    if event['isFinalInvokeForWindow']:
        # logic to handle final state of the window
        print('Destination invoke')
    else:
        print('Aggregate invoke')

#Check for early terminations
    if event['isWindowTerminatedEarly']:
        print('Window terminated early')

    #Aggregation logic
    state = event['state']
    for record in event['Records']:
        state[record['kinesis']['partitionKey']] = state.get(record['kinesis']['partitionKey'], 0) + 1

    print('Returning state: ', state)
    return {'state': state}
```

# Parameter Lambda untuk pemetaan sumber peristiwa Amazon Kinesis Data Streams
<a name="services-kinesis-parameters"></a>

Semua pemetaan sumber peristiwa Lambda berbagi operasi yang sama [CreateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html)dan API. [UpdateEventSourceMapping](https://docs.aws.amazon.com/lambda/latest/api/API_UpdateEventSourceMapping.html) Namun, hanya beberapa parameter yang berlaku untuk Kinesis.


| Parameter | Diperlukan | Default | Catatan | 
| --- | --- | --- | --- | 
|  [BatchSize](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-BatchSize)  |  T  |  100  |  Maksimum: 10.000.  | 
|  [BisectBatchOnFunctionError](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-BisectBatchOnFunctionError)  |  T  |  false  |  none | 
|  [DestinationConfig](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-DestinationConfig)  |  T  | N/A |  Antrian Amazon SQS atau tujuan topik Amazon SNS untuk catatan yang dibuang. Untuk informasi selengkapnya, lihat [Mengonfigurasi tujuan untuk pemanggilan yang gagal](kinesis-on-failure-destination.md#kinesis-on-failure-destination-console).  | 
|  [Diaktifkan](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-Enabled)  |  T  |  true  |  none | 
|  [EventSourceArn](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-EventSourceArn)  |  T  | N/A |  ARN dari aliran data atau konsumen aliran  | 
|  [FunctionName](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionName)  |  T  | N/A |  none | 
|  [FunctionResponseTypes](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-FunctionResponseTypes)  |  T  |  N/A |  Agar fungsi Anda melaporkan kegagalan tertentu dalam satu batch, sertakan nilainya `ReportBatchItemFailures``FunctionResponseTypes`. Untuk informasi selengkapnya, lihat [Mengonfigurasi respons batch sebagian dengan Kinesis Data Streams dan Lambda](services-kinesis-batchfailurereporting.md).  | 
|  [MaximumBatchingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumBatchingWindowInSeconds)  |  T  |  0  |  none | 
|  [MaximumRecordAgeInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRecordAgeInSeconds)  |  T  |  -1  |  -1 berarti tak terbatas: Lambda tidak membuang catatan (pengaturan retensi data [Kinesis Data Streams](https://docs.aws.amazon.com/streams/latest/dev/kinesis-extended-retention.html) masih berlaku) Minimal: -1 Maksimal: 604.800  | 
|  [MaximumRetryAttempts](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-MaximumRetryAttempts)  |  T  |  -1  |  -1 berarti tak terbatas: catatan gagal dicoba ulang sampai catatan kedaluwarsa Minimal: -1 Maksimum: 10.000.  | 
|  [ParallelizationFactor](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-ParallelizationFactor)  |  T  |  1  |  Maksimal: 10  | 
|  [StartingPosition](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-StartingPosition)  |  T  |  N/A |  AT\$1TIMESTAMP, TRIM\$1HORIZON, atau TERBARU  | 
|  [StartingPositionTimestamp](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-StartingPositionTimestamp)  |  T  |  N/A |  Hanya valid jika StartingPosition disetel ke AT\$1TIMESTAMP. Waktu untuk mulai membaca, dalam detik waktu Unix  | 
|  [TumblingWindowInSeconds](https://docs.aws.amazon.com/lambda/latest/api/API_CreateEventSourceMapping.html#lambda-CreateEventSourceMapping-request-TumblingWindowInSeconds)  |  T  |  N/A |  Minimal: 0 Maksimal: 900  | 

# Menggunakan pemfilteran acara dengan sumber acara Kinesis
<a name="with-kinesis-filtering"></a>

Anda dapat menggunakan pemfilteran peristiwa untuk mengontrol rekaman mana dari aliran atau antrian yang dikirim Lambda ke fungsi Anda. Untuk informasi umum tentang cara kerja penyaringan acara, lihat[Kontrol peristiwa mana yang dikirim Lambda ke fungsi Anda](invocation-eventfiltering.md).

Bagian ini berfokus pada penyaringan acara untuk sumber acara Kinesis.

**catatan**  
Pemetaan sumber peristiwa Kinesis hanya mendukung pemfilteran pada kunci. `data`

**Topics**
+ [Dasar-dasar penyaringan acara Kinesis](#filtering-kinesis)
+ [Menyaring catatan agregat Kinesis](#filtering-kinesis-efo)

## Dasar-dasar penyaringan acara Kinesis
<a name="filtering-kinesis"></a>

Misalkan produser memasukkan data yang diformat JSON ke dalam aliran data Kinesis Anda. Contoh catatan akan terlihat seperti berikut, dengan data JSON dikonversi ke string yang dikodekan Base64 di lapangan. `data`

```
{
    "kinesis": {
        "kinesisSchemaVersion": "1.0",
        "partitionKey": "1",
        "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
        "data": "eyJSZWNvcmROdW1iZXIiOiAiMDAwMSIsICJUaW1lU3RhbXAiOiAieXl5eS1tbS1kZFRoaDptbTpzcyIsICJSZXF1ZXN0Q29kZSI6ICJBQUFBIn0=",
        "approximateArrivalTimestamp": 1545084650.987
        },
    "eventSource": "aws:kinesis",
    "eventVersion": "1.0",
    "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
    "eventName": "aws:kinesis:record",
    "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role",
    "awsRegion": "us-east-2",
    "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream"
}
```

Selama data yang dimasukkan produsen ke dalam aliran adalah JSON yang valid, Anda dapat menggunakan pemfilteran peristiwa untuk memfilter catatan menggunakan kunci. `data` Misalkan produser memasukkan catatan ke dalam aliran Kinesis Anda dalam format JSON berikut.

```
{
    "record": 12345,
    "order": {
        "type": "buy",
        "stock": "ANYCO",
        "quantity": 1000
        }
}
```

Untuk memfilter hanya catatan di mana jenis pesanan adalah “beli,” `FilterCriteria` objeknya adalah sebagai berikut.

```
{
    "Filters": [
        {
            "Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"
        }
    ]
}
```

Untuk kejelasan tambahan, berikut adalah nilai filter yang `Pattern` diperluas di JSON biasa. 

```
{
    "data": {
        "order": {
            "type": [ "buy" ]
            }
      }
}
```

Anda dapat menambahkan filter menggunakan konsol, AWS CLI atau AWS SAM templat.

------
#### [ Console ]

Untuk menambahkan filter ini menggunakan konsol, ikuti instruksi [Melampirkan kriteria filter ke pemetaan sumber peristiwa (konsol)](invocation-eventfiltering.md#filtering-console) dan masukkan string berikut untuk **kriteria Filter**.

```
{ "data" : { "order" : { "type" : [ "buy" ] } } }
```

------
#### [ AWS CLI ]

Untuk membuat pemetaan sumber peristiwa baru dengan kriteria filter ini menggunakan AWS Command Line Interface (AWS CLI), jalankan perintah berikut.

```
aws lambda create-event-source-mapping \
    --function-name my-function \
    --event-source-arn arn:aws:kinesis:us-east-2:123456789012:stream/my-stream \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
```

Untuk menambahkan kriteria filter ini ke pemetaan sumber peristiwa yang ada, jalankan perintah berikut.

```
aws lambda update-event-source-mapping \
    --uuid "a1b2c3d4-5678-90ab-cdef-11111EXAMPLE" \
    --filter-criteria '{"Filters": [{"Pattern": "{ \"data\" : { \"order\" : { \"type\" : [ \"buy\" ] } } }"}]}'
```

------
#### [ AWS SAM ]

Untuk menambahkan filter ini menggunakan AWS SAM, tambahkan cuplikan berikut ke template YAMAL untuk sumber acara Anda.

```
FilterCriteria:
  Filters:
    - Pattern: '{ "data" : { "order" : { "type" : [ "buy" ] } } }'
```

------

Untuk memfilter peristiwa dengan benar dari sumber Kinesis, bidang data dan kriteria filter Anda untuk bidang data harus dalam format JSON yang valid. Jika salah satu bidang tidak dalam format JSON yang valid, Lambda akan menghapus pesan atau melempar pengecualian. Tabel berikut merangkum perilaku spesifik: 


| Format data masuk | Format pola filter untuk properti data | Tindakan yang dihasilkan | 
| --- | --- | --- | 
|  JSON yang valid  |  JSON yang valid  |  Filter Lambda berdasarkan kriteria filter Anda.  | 
|  JSON yang valid  |  Tidak ada pola filter untuk properti data  |  Filter Lambda (hanya pada properti metadata lainnya) berdasarkan kriteria filter Anda.  | 
|  JSON yang valid  |  Non-JSON  |  Lambda melempar pengecualian pada saat pembuatan atau pembaruan pemetaan sumber acara. Pola filter untuk properti data harus dalam format JSON yang valid.  | 
|  Non-JSON  |  JSON yang valid  |  Lambda menjatuhkan rekor.  | 
|  Non-JSON  |  Tidak ada pola filter untuk properti data  |  Filter Lambda (hanya pada properti metadata lainnya) berdasarkan kriteria filter Anda.  | 
|  Non-JSON  |  Non-JSON  |  Lambda melempar pengecualian pada saat pembuatan atau pembaruan pemetaan sumber acara. Pola filter untuk properti data harus dalam format JSON yang valid.  | 

## Menyaring catatan agregat Kinesis
<a name="filtering-kinesis-efo"></a>

Dengan Kinesis, Anda dapat menggabungkan beberapa catatan ke dalam satu catatan Kinesis Data Streams untuk meningkatkan throughput data Anda. [Lambda hanya dapat menerapkan kriteria filter ke rekaman agregat saat Anda menggunakan Kinesis yang ditingkatkan fan-out.](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html) Memfilter rekaman agregat dengan Kinesis standar tidak didukung. Saat menggunakan fan-out yang disempurnakan, Anda mengonfigurasi konsumen throughput khusus Kinesis untuk bertindak sebagai pemicu fungsi Lambda Anda. Lambda kemudian memfilter catatan agregat dan hanya meneruskan catatan yang memenuhi kriteria filter Anda.

Untuk mempelajari lebih lanjut tentang agregasi catatan Kinesis, lihat bagian [Agregasi](https://docs.aws.amazon.com/streams/latest/dev/kinesis-kpl-concepts.html#kinesis-kpl-concepts-aggretation) pada halaman Konsep Kunci Perpustakaan Produsen Kinesis (KPL). Untuk mempelajari lebih lanjut tentang menggunakan Lambda dengan Kinesis yang ditingkatkan fan-out, lihat [Meningkatkan performa pemrosesan streaming real-time dengan Amazon Kinesis Data Streams yang disempurnakan oleh fan-out dan Lambda di blog komputasi](https://aws.amazon.com/blogs/compute/increasing-real-time-stream-processing-performance-with-amazon-kinesis-data-streams-enhanced-fan-out-and-aws-lambda/). AWS AWS 

# Tutorial: Menggunakan Lambda dengan Kinesis Data Streams
<a name="with-kinesis-example"></a>

Dalam tutorial ini, Anda membuat fungsi Lambda untuk mengkonsumsi peristiwa dari aliran data Amazon Kinesis. 

1. Aplikasi kustom menulis rekaman ke aliran.

1. AWS Lambda polling aliran dan, ketika mendeteksi catatan baru dalam aliran, memanggil fungsi Lambda Anda.

1. AWS Lambda menjalankan fungsi Lambda dengan mengasumsikan peran eksekusi yang Anda tentukan pada saat Anda membuat fungsi Lambda.

## Prasyarat
<a name="with-kinesis-prepare"></a>

### Instal AWS Command Line Interface
<a name="install_aws_cli"></a>

Jika Anda belum menginstal AWS Command Line Interface, ikuti langkah-langkah di [Menginstal atau memperbarui versi terbaru AWS CLI untuk menginstalnya](https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html).

Tutorial ini membutuhkan terminal baris perintah atau shell untuk menjalankan perintah. Di Linux dan macOS, gunakan shell dan manajer paket pilihan Anda.

**catatan**  
Di Windows, beberapa perintah Bash CLI yang biasa Anda gunakan dengan Lambda (`zip`seperti) tidak didukung oleh terminal bawaan sistem operasi. Untuk mendapatkan versi terintegrasi Windows dari Ubuntu dan Bash, [instal Windows Subsystem untuk](https://docs.microsoft.com/en-us/windows/wsl/install-win10) Linux. 

## Buat peran eksekusi
<a name="with-kinesis-example-create-iam-role"></a>

Buat [peran eksekusi](lambda-intro-execution-role.md) yang memberikan izin fungsi Anda untuk mengakses AWS sumber daya.

**Untuk membuat peran eksekusi**

1. Buka [halaman peran](https://console.aws.amazon.com/iam/home#/roles) di konsol IAM.

1. Pilih **Buat peran**.

1. Buat peran dengan properti berikut.
   + **Entitas tepercaya** – **AWS Lambda**.
   + **Izin** – **AWSLambdaKinesisExecutionRole**.
   + **Nama peran** – **lambda-kinesis-role**.

**AWSLambdaKinesisExecutionRole**Kebijakan memiliki izin yang diperlukan fungsi untuk membaca item dari Kinesis dan menulis log ke Log CloudWatch .

## Buat fungsi
<a name="with-kinesis-example-create-function"></a>

Buat fungsi Lambda yang memproses pesan Kinesis Anda. Kode fungsi mencatat ID peristiwa dan data peristiwa dari catatan Kinesis ke CloudWatch Log.

Tutorial ini menggunakan runtime Node.js 24, tetapi kami juga menyediakan kode contoh dalam bahasa runtime lainnya. Anda dapat memilih tab di kotak berikut untuk melihat kode runtime yang Anda minati. JavaScript Kode yang akan Anda gunakan dalam langkah ini adalah pada contoh pertama yang ditunjukkan di **JavaScript**tab.

------
#### [ .NET ]

**SDK untuk .NET**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Mengkonsumsi acara Kinesis dengan Lambda menggunakan.NET.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
﻿using System.Text;
using Amazon.Lambda.Core;
using Amazon.Lambda.KinesisEvents;
using AWS.Lambda.Powertools.Logging;

// Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class.
[assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))]

namespace KinesisIntegrationSampleCode;

public class Function
{
    // Powertools Logger requires an environment variables against your function
    // POWERTOOLS_SERVICE_NAME
    [Logging(LogEvent = true)]
    public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context)
    {
        if (evnt.Records.Count == 0)
        {
            Logger.LogInformation("Empty Kinesis Event received");
            return;
        }

        foreach (var record in evnt.Records)
        {
            try
            {
                Logger.LogInformation($"Processed Event with EventId: {record.EventId}");
                string data = await GetRecordDataAsync(record.Kinesis, context);
                Logger.LogInformation($"Data: {data}");
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex)
            {
                Logger.LogError($"An error occurred {ex.Message}");
                throw;
            }
        }
        Logger.LogInformation($"Successfully processed {evnt.Records.Count} records.");
    }

    private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context)
    {
        byte[] bytes = record.Data.ToArray();
        string data = Encoding.UTF8.GetString(bytes);
        await Task.CompletedTask; //Placeholder for actual async work
        return data;
    }
}
```

------
#### [ Go ]

**SDK untuk Go V2**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Mengkonsumsi acara Kinesis dengan Lambda menggunakan Go.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package main

import (
	"context"
	"log"

	"github.com/aws/aws-lambda-go/events"
	"github.com/aws/aws-lambda-go/lambda"
)

func handler(ctx context.Context, kinesisEvent events.KinesisEvent) error {
	if len(kinesisEvent.Records) == 0 {
		log.Printf("empty Kinesis event received")
		return nil
	}

	for _, record := range kinesisEvent.Records {
		log.Printf("processed Kinesis event with EventId: %v", record.EventID)
		recordDataBytes := record.Kinesis.Data
		recordDataText := string(recordDataBytes)
		log.Printf("record data: %v", recordDataText)
		// TODO: Do interesting work based on the new data
	}
	log.Printf("successfully processed %v records", len(kinesisEvent.Records))
	return nil
}

func main() {
	lambda.Start(handler)
}
```

------
#### [ Java ]

**SDK untuk Java 2.x**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Mengkonsumsi acara Kinesis dengan Lambda menggunakan Java.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
package example;

import com.amazonaws.services.lambda.runtime.Context;
import com.amazonaws.services.lambda.runtime.LambdaLogger;
import com.amazonaws.services.lambda.runtime.RequestHandler;
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;

public class Handler implements RequestHandler<KinesisEvent, Void> {
    @Override
    public Void handleRequest(final KinesisEvent event, final Context context) {
        LambdaLogger logger = context.getLogger();
        if (event.getRecords().isEmpty()) {
            logger.log("Empty Kinesis Event received");
            return null;
        }
        for (KinesisEvent.KinesisEventRecord record : event.getRecords()) {
            try {
                logger.log("Processed Event with EventId: "+record.getEventID());
                String data = new String(record.getKinesis().getData().array());
                logger.log("Data:"+ data);
                // TODO: Do interesting work based on the new data
            }
            catch (Exception ex) {
                logger.log("An error occurred:"+ex.getMessage());
                throw ex;
            }
        }
        logger.log("Successfully processed:"+event.getRecords().size()+" records");
        return null;
    }

}
```

------
#### [ JavaScript ]

**SDK untuk JavaScript (v3)**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/blob/main/integration-kinesis-to-lambda). 
Mengkonsumsi acara Kinesis dengan menggunakan Lambda. JavaScript  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
exports.handler = async (event, context) => {
  for (const record of event.Records) {
    try {
      console.log(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      console.log(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      console.error(`An error occurred ${err}`);
      throw err;
    }
  }
  console.log(`Successfully processed ${event.Records.length} records.`);
};

async function getRecordDataAsync(payload) {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```
Mengkonsumsi acara Kinesis dengan menggunakan Lambda. TypeScript  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
import {
  KinesisStreamEvent,
  Context,
  KinesisStreamHandler,
  KinesisStreamRecordPayload,
} from "aws-lambda";
import { Buffer } from "buffer";
import { Logger } from "@aws-lambda-powertools/logger";

const logger = new Logger({
  logLevel: "INFO",
  serviceName: "kinesis-stream-handler-sample",
});

export const functionHandler: KinesisStreamHandler = async (
  event: KinesisStreamEvent,
  context: Context
): Promise<void> => {
  for (const record of event.Records) {
    try {
      logger.info(`Processed Kinesis Event - EventID: ${record.eventID}`);
      const recordData = await getRecordDataAsync(record.kinesis);
      logger.info(`Record Data: ${recordData}`);
      // TODO: Do interesting work based on the new data
    } catch (err) {
      logger.error(`An error occurred ${err}`);
      throw err;
    }
    logger.info(`Successfully processed ${event.Records.length} records.`);
  }
};

async function getRecordDataAsync(
  payload: KinesisStreamRecordPayload
): Promise<string> {
  var data = Buffer.from(payload.data, "base64").toString("utf-8");
  await Promise.resolve(1); //Placeholder for actual async work
  return data;
}
```

------
#### [ PHP ]

**SDK untuk PHP**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Mengkonsumsi acara Kinesis dengan Lambda menggunakan PHP.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
<?php

# using bref/bref and bref/logger for simplicity

use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Kinesis\KinesisHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends KinesisHandler
{
    private StderrLogger $logger;
    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * @throws JsonException
     * @throws \Bref\Event\InvalidLambdaEvent
     */
    public function handleKinesis(KinesisEvent $event, Context $context): void
    {
        $this->logger->info("Processing records");
        $records = $event->getRecords();
        foreach ($records as $record) {
            $data = $record->getData();
            $this->logger->info(json_encode($data));
            // TODO: Do interesting work based on the new data

            // Any exception thrown will be logged and the invocation will be marked as failed
        }
        $totalRecords = count($records);
        $this->logger->info("Successfully processed $totalRecords records");
    }
}

$logger = new StderrLogger();
return new Handler($logger);
```

------
#### [ Python ]

**SDK untuk Python (Boto3)**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Mengkonsumsi acara Kinesis dengan Lambda menggunakan Python.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
import base64
def lambda_handler(event, context):

    for record in event['Records']:
        try:
            print(f"Processed Kinesis Event - EventID: {record['eventID']}")
            record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8')
            print(f"Record Data: {record_data}")
            # TODO: Do interesting work based on the new data
        except Exception as e:
            print(f"An error occurred {e}")
            raise e
    print(f"Successfully processed {len(event['Records'])} records.")
```

------
#### [ Ruby ]

**SDK untuk Ruby**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Mengkonsumsi acara Kinesis dengan Lambda menggunakan Ruby.  

```
# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
# SPDX-License-Identifier: Apache-2.0
require 'aws-sdk'

def lambda_handler(event:, context:)
  event['Records'].each do |record|
    begin
      puts "Processed Kinesis Event - EventID: #{record['eventID']}"
      record_data = get_record_data_async(record['kinesis'])
      puts "Record Data: #{record_data}"
      # TODO: Do interesting work based on the new data
    rescue => err
      $stderr.puts "An error occurred #{err}"
      raise err
    end
  end
  puts "Successfully processed #{event['Records'].length} records."
end

def get_record_data_async(payload)
  data = Base64.decode64(payload['data']).force_encoding('UTF-8')
  # Placeholder for actual async work
  # You can use Ruby's asynchronous programming tools like async/await or fibers here.
  return data
end
```

------
#### [ Rust ]

**SDK for Rust**  
 Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di repositori [contoh Nirserver](https://github.com/aws-samples/serverless-snippets/tree/main/integration-kinesis-to-lambda). 
Mengkonsumsi acara Kinesis dengan Lambda menggunakan Rust.  

```
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use aws_lambda_events::event::kinesis::KinesisEvent;
use lambda_runtime::{run, service_fn, Error, LambdaEvent};

async fn function_handler(event: LambdaEvent<KinesisEvent>) -> Result<(), Error> {
    if event.payload.records.is_empty() {
        tracing::info!("No records found. Exiting.");
        return Ok(());
    }

    event.payload.records.iter().for_each(|record| {
        tracing::info!("EventId: {}",record.event_id.as_deref().unwrap_or_default());

        let record_data = std::str::from_utf8(&record.kinesis.data);

        match record_data {
            Ok(data) => {
                // log the record data
                tracing::info!("Data: {}", data);
            }
            Err(e) => {
                tracing::error!("Error: {}", e);
            }
        }
    });

    tracing::info!(
        "Successfully processed {} records",
        event.payload.records.len()
    );

    Ok(())
}

#[tokio::main]
async fn main() -> Result<(), Error> {
    tracing_subscriber::fmt()
        .with_max_level(tracing::Level::INFO)
        // disable printing the name of the module in every log line.
        .with_target(false)
        // disabling time is handy because CloudWatch will add the ingestion time.
        .without_time()
        .init();

    run(service_fn(function_handler)).await
}
```

------

**Untuk membuat fungsi**

1. Buat direktori untuk proyek, dan kemudian beralih ke direktori itu.

   ```
   mkdir kinesis-tutorial
   cd kinesis-tutorial
   ```

1. Salin JavaScript kode sampel ke file baru bernama`index.js`.

1. Buat paket deployment.

   ```
   zip function.zip index.js
   ```

1. Buat fungsi Lambda dengan perintah `create-function`.

   ```
   aws lambda create-function --function-name ProcessKinesisRecords \
   --zip-file fileb://function.zip --handler index.handler --runtime nodejs24.x \
   --role arn:aws:iam::111122223333:role/lambda-kinesis-role
   ```

## Uji fungsi Lambda
<a name="walkthrough-kinesis-events-adminuser-create-test-function-upload-zip-test-manual-invoke"></a>

Panggil fungsi Lambda Anda secara manual menggunakan perintah `invoke` AWS Lambda CLI dan contoh peristiwa Kinesis.

**Untuk menguji fungsi Lambda**

1. Salin JSON berikut ke dalam file dan simpan sebagai `input.txt`. 

   ```
   {
       "Records": [
           {
               "kinesis": {
                   "kinesisSchemaVersion": "1.0",
                   "partitionKey": "1",
                   "sequenceNumber": "49590338271490256608559692538361571095921575989136588898",
                   "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==",
                   "approximateArrivalTimestamp": 1545084650.987
               },
               "eventSource": "aws:kinesis",
               "eventVersion": "1.0",
               "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898",
               "eventName": "aws:kinesis:record",
               "invokeIdentityArn": "arn:aws:iam::111122223333:role/lambda-kinesis-role",
               "awsRegion": "us-east-2",
               "eventSourceARN": "arn:aws:kinesis:us-east-2:111122223333:stream/lambda-stream"
           }
       ]
   }
   ```

1. Gunakan perintah `invoke` untuk mengirim kejadian ke fungsi.

   ```
   aws lambda invoke --function-name ProcessKinesisRecords \
   --cli-binary-format raw-in-base64-out \
   --payload file://input.txt outputfile.txt
   ```

   **cli-binary-format**Opsi ini diperlukan jika Anda menggunakan AWS CLI versi 2. Untuk menjadikan ini pengaturan default, jalankan`aws configure set cli-binary-format raw-in-base64-out`. Untuk informasi selengkapnya, lihat [opsi baris perintah global yang AWS CLI didukung](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-options.html#cli-configure-options-list) di *Panduan AWS Command Line Interface Pengguna untuk Versi 2*.

   Respons disimpan ke `out.txt`.

## Buat aliran Kinesis
<a name="with-kinesis-example-configure-event-source-create"></a>

Gunakan perintah `create-stream ` untuk membuat aliran.

```
aws kinesis create-stream --stream-name lambda-stream --shard-count 1
```

Jalankan perintah `describe-stream` berikut untuk mendapatkan ARN aliran.

```
aws kinesis describe-stream --stream-name lambda-stream
```

Anda akan melihat output berikut:

```
{
    "StreamDescription": {
        "Shards": [
            {
                "ShardId": "shardId-000000000000",
                "HashKeyRange": {
                    "StartingHashKey": "0",
                    "EndingHashKey": "340282366920746074317682119384634633455"
                },
                "SequenceNumberRange": {
                    "StartingSequenceNumber": "49591073947768692513481539594623130411957558361251844610"
                }
            }
        ],
        "StreamARN": "arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream",
        "StreamName": "lambda-stream",
        "StreamStatus": "ACTIVE",
        "RetentionPeriodHours": 24,
        "EnhancedMonitoring": [
            {
                "ShardLevelMetrics": []
            }
        ],
        "EncryptionType": "NONE",
        "KeyId": null,
        "StreamCreationTimestamp": 1544828156.0
    }
}
```

Anda menggunakan ARN aliran di langkah berikutnya untuk Anda mengasosiasikan aliran dengan fungsi Lambda Anda.

## Tambahkan sumber acara di AWS Lambda
<a name="with-kinesis-example-configure-event-source-add-event-source"></a>

Jalankan perintah AWS CLI `add-event-source` berikut.

```
aws lambda create-event-source-mapping --function-name ProcessKinesisRecords \
--event-source  arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream \
--batch-size 100 --starting-position LATEST
```

Catat ID pemetaan untuk penggunaan selanjutnya. Anda bisa mendapatkan daftar pemetaan sumber kejadian dengan menjalankan perintah `list-event-source-mappings`.

```
aws lambda list-event-source-mappings --function-name ProcessKinesisRecords \
--event-source arn:aws:kinesis:us-east-1:111122223333:stream/lambda-stream
```

Dalam respons, Anda dapat memverifikasi nilai status adalah `enabled`. Pemetaan sumber peristiwa dapat dinonaktifkan untuk menjeda polling sementara tanpa kehilangan catatan.

## Uji penyiapan
<a name="with-kinesis-example-configure-event-source-test-end-to-end"></a>

Untuk menguji pemetaan sumber kejadian, tambahkan rekaman kejadian ke aliran Kinesis Anda. Nilai `--data` adalah string yang dienkodekan CLI ke base64 sebelum mengirimkannya ke Kinesis. Anda dapat menjalankan perintah yang sama lebih dari satu kali untuk menambahkan beberapa rekaman ke aliran.

```
aws kinesis put-record --stream-name lambda-stream --partition-key 1 \
--data "Hello, this is a test."
```

Lambda menggunakan peran eksekusi untuk membaca rekaman dari aliran. Kemudian, itu akan memanggil fungsi Lambda Anda, dengan mengirimkan batch rekaman. Fungsi ini menerjemahkan data dari setiap catatan dan mencatatnya, mengirimkan output ke CloudWatch Log. Lihat log di [konsol CloudWatch ](https://console.aws.amazon.com/cloudwatch).

## Bersihkan sumber daya Anda
<a name="cleanup"></a>

Sekarang Anda dapat menghapus sumber daya yang Anda buat untuk tutorial ini, kecuali Anda ingin mempertahankannya. Dengan menghapus AWS sumber daya yang tidak lagi Anda gunakan, Anda mencegah tagihan yang tidak perlu ke Anda Akun AWS.

**Untuk menghapus peran eksekusi**

1. Buka [halaman Peran](https://console.aws.amazon.com/iam/home#/roles) dari konsol IAM.

1. Pilih peran eksekusi yang Anda buat.

1. Pilih **Hapus**.

1. Masukkan nama peran di bidang input teks dan pilih **Hapus**.

**Untuk menghapus fungsi Lambda**

1. Buka [halaman Fungsi](https://console.aws.amazon.com/lambda/home#/functions) di konsol Lambda.

1. Pilih fungsi yang Anda buat.

1. Pilih **Tindakan**, **Hapus**.

1. Ketik **confirm** kolom input teks dan pilih **Hapus**.

**Untuk menghapus aliran Kinesis**

1. [Masuk ke Konsol Manajemen AWS dan buka konsol Kinesis di /kinesis. https://console.aws.amazon.com](https://console.aws.amazon.com/kinesis)

1. Pilih aliran yang Anda buat.

1. Pilih **Tindakan**, **Hapus**.

1. Masukkan **delete** di bidang input teks.

1. Pilih **Hapus**.