Memproses seluruh batch data dengan fungsi Lambda - AWS Step Functions

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Memproses seluruh batch data dengan fungsi Lambda

Dalam tutorial ini, Anda menggunakan ItemBatcher bidang status Peta Terdistribusi untuk memproses seluruh kumpulan item di dalam fungsi Lambda. Setiap batch berisi maksimal tiga item. Status Peta Terdistribusi memulai empat eksekusi alur kerja anak, di mana setiap eksekusi memproses tiga item, sementara satu eksekusi memproses satu item. Setiap eksekusi alur kerja anak memanggil fungsi Lambda yang mengulang item individual yang ada dalam batch.

Anda akan membuat mesin status yang melakukan perkalian pada array bilangan bulat. Katakanlah bahwa array integer yang Anda berikan sebagai input adalah [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] dan faktor perkalian adalah. 7 Kemudian, array yang dihasilkan terbentuk setelah mengalikan bilangan bulat ini dengan faktor 7, akan menjadi. [7, 14, 21, 28, 35, 42, 49, 56, 63, 70]

Langkah 1: Buat mesin negara

Pada langkah ini, Anda membuat prototipe alur kerja mesin status yang meneruskan seluruh kumpulan data ke fungsi Lambda yang akan Anda buat di Langkah 2.

  • Gunakan definisi berikut untuk membuat mesin status menggunakan konsol Step Functions. Untuk informasi tentang membuat mesin status, lihat Langkah 1: Buat prototipe alur kerja di Memulai dengan menggunakan Distributed Map state tutorial.

    Dalam mesin status ini, Anda mendefinisikan status Peta Terdistribusi yang menerima array 10 bilangan bulat sebagai input dan meneruskan array ini ke fungsi Lambda dalam batch. 3 Fungsi Lambda mengulangi item individual yang ada dalam batch dan mengembalikan array keluaran bernama. multiplied Array keluaran berisi hasil perkalian yang dilakukan pada item yang dilewatkan dalam array input.

    { "StartAt": "Pass", "States": { "Pass": { "Type": "Pass", "Next": "Map", "Result": { "MyMultiplicationFactor": 7, "MyItems": [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] } }, "Map": { "Type": "Map", "ItemProcessor": { "ProcessorConfig": { "Mode": "DISTRIBUTED", "ExecutionType": "STANDARD" }, "StartAt": "Lambda Invoke", "States": { "Lambda Invoke": { "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "OutputPath": "$.Payload", "Parameters": { "Payload.$": "$", "FunctionName": "arn:aws:lambda:us-east-1:123456789012:function:functionName" }, "Retry": [ { "ErrorEquals": [ "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException", "Lambda.TooManyRequestsException" ], "IntervalSeconds": 2, "MaxAttempts": 6, "BackoffRate": 2 } ], "End": true } } }, "End": true, "Label": "Map", "MaxConcurrency": 1000, "ItemBatcher": { "MaxItemsPerBatch": 3, "BatchInput": { "MyMultiplicationFactor.$": "$.MyMultiplicationFactor" } }, "ItemsPath": "$.MyItems" } } }

Langkah 2: Buat fungsi Lambda

Pada langkah ini, Anda membuat fungsi Lambda yang memproses semua item yang diteruskan dalam batch.

penting

Pastikan fungsi Lambda Anda Wilayah AWS sama dengan mesin status Anda.

Untuk membuat fungsi Lambda
  1. Gunakan konsol Lambda untuk membuat nama fungsi Lambda Python 3.9. ProcessEntireBatch Untuk informasi tentang membuat fungsi Lambda, lihat Langkah 4: Mengkonfigurasi fungsi Lambda dalam Memulai dengan menggunakan Distributed Map state tutorial.

  2. Salin kode berikut untuk fungsi Lambda dan tempelkan ke bagian sumber Kode fungsi Lambda Anda.

    import json def lambda_handler(event, context): multiplication_factor = event['BatchInput']['MyMultiplicationFactor'] items = event['Items'] results = [multiplication_factor * item for item in items] return { 'statusCode': 200, 'multiplied': results }
  3. Setelah Anda membuat fungsi Lambda Anda, salin ARN fungsi yang ditampilkan di sudut kanan atas halaman. Untuk menyalin ARN, klik tombol. icon to copy the Lambda function's Amazon Resource Name Berikut ini adalah contoh ARN, di mana function-nameadalah nama fungsi Lambda (dalam hal ini,): ProcessEntireBatch

    arn:aws:lambda:us-east-1:123456789012:function:function-name

    Anda harus menyediakan fungsi ARN di mesin status yang Anda buat di Langkah 1.

  4. Pilih Terapkan untuk menyebarkan perubahan.

Langkah 3: Jalankan mesin negara

Saat Anda menjalankan mesin status, status Peta Terdistribusi memulai empat eksekusi alur kerja anak, di mana setiap eksekusi memproses tiga item, sementara satu eksekusi memproses satu item.

Contoh berikut menunjukkan data yang diteruskan ke ProcessEntireBatchfungsi oleh salah satu eksekusi alur kerja anak.

{ "BatchInput": { "MyMultiplicationFactor": 7 }, "Items": [1, 2, 3] }

Dengan masukan ini, contoh berikut menunjukkan array keluaran bernama multiplied yang dikembalikan oleh fungsi Lambda.

{ "statusCode": 200, "multiplied": [7, 14, 21] }

Mesin state mengembalikan output berikut yang berisi empat array yang dinamai multiplied untuk empat eksekusi alur kerja anak. Array ini berisi hasil perkalian dari masing-masing item input.

[ { "statusCode": 200, "multiplied": [7, 14, 21] }, { "statusCode": 200, "multiplied": [28, 35, 42] }, { "statusCode": 200, "multiplied": [49, 56, 63] }, { "statusCode": 200, "multiplied": [70] } ]

Untuk menggabungkan semua item array yang dikembalikan ke dalam array keluaran tunggal, Anda dapat menggunakan ResultSelector bidang tersebut. Tentukan bidang ini di dalam status Peta Terdistribusi untuk menemukan semua multiplied array, ekstrak semua item di dalam array ini, dan kemudian menggabungkan mereka ke dalam array output tunggal.

Untuk menggunakan ResultSelector bidang, perbarui definisi mesin status Anda seperti yang ditunjukkan pada contoh berikut.

{ "StartAt": "Pass", "States": { ... ... "Map": { "Type": "Map", ... ... "ItemsPath": "$.MyItems", "ResultSelector": { "multiplied.$": "$..multiplied[*]" } } } }

Mesin status diperbarui mengembalikan array output konsolidasi seperti yang ditunjukkan pada contoh berikut.

{ "multiplied": [7, 14, 21, 28, 35, 42, 49, 56, 63, 70] }