Pilih preferensi cookie Anda

Kami menggunakan cookie penting serta alat serupa yang diperlukan untuk menyediakan situs dan layanan. Kami menggunakan cookie performa untuk mengumpulkan statistik anonim sehingga kami dapat memahami cara pelanggan menggunakan situs dan melakukan perbaikan. Cookie penting tidak dapat dinonaktifkan, tetapi Anda dapat mengklik “Kustom” atau “Tolak” untuk menolak cookie performa.

Jika Anda setuju, AWS dan pihak ketiga yang disetujui juga akan menggunakan cookie untuk menyediakan fitur situs yang berguna, mengingat preferensi Anda, dan menampilkan konten yang relevan, termasuk iklan yang relevan. Untuk menerima atau menolak semua cookie yang tidak penting, klik “Terima” atau “Tolak”. Untuk membuat pilihan yang lebih detail, klik “Kustomisasi”.

Pertahankan praktik terbaik untuk Layanan Terkelola untuk aplikasi Apache Flink

Mode fokus
Pertahankan praktik terbaik untuk Layanan Terkelola untuk aplikasi Apache Flink - Layanan Terkelola untuk Apache Flink

Amazon Managed Service untuk Apache Flink sebelumnya dikenal sebagai Amazon Kinesis Data Analytics untuk Apache Flink.

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Amazon Managed Service untuk Apache Flink sebelumnya dikenal sebagai Amazon Kinesis Data Analytics untuk Apache Flink.

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Bagian ini berisi informasi dan rekomendasi untuk mengembangkan Layanan Terkelola yang stabil dan berkinerja untuk aplikasi Apache Flink.

Minimalkan ukuran uber JAR

Java/Scala application must be packaged in an uber (super/fat) JAR dan sertakan semua dependensi tambahan yang diperlukan yang belum disediakan oleh runtime. Namun, ukuran uber JAR mempengaruhi waktu mulai dan memulai ulang aplikasi dan dapat JAR menyebabkan melebihi batas 512 MB.

Untuk mengoptimalkan waktu penerapan, uber Anda tidak JAR boleh menyertakan yang berikut:

  • Setiap dependensi yang disediakan oleh runtime seperti yang diilustrasikan dalam contoh berikut. Mereka harus memiliki provided ruang lingkup dalam POM file atau compileOnly dalam konfigurasi Gradle Anda.

  • Setiap dependensi yang digunakan untuk pengujian saja, misalnya JUnit atau Mockito. Mereka harus memiliki test ruang lingkup dalam POM file atau testImplementation dalam konfigurasi Gradle Anda.

  • Dependensi apa pun yang sebenarnya tidak digunakan oleh aplikasi Anda.

  • Setiap data statis atau metadata yang diperlukan oleh aplikasi Anda. Data statis harus dimuat oleh aplikasi saat runtime, misalnya dari datastore atau dari Amazon S3.

  • Lihat file POM contoh ini untuk detail tentang pengaturan konfigurasi sebelumnya.

Dependensi yang disediakan

Managed Service for Apache Flink runtime menyediakan sejumlah dependensi. Dependensi ini tidak boleh dimasukkan dalam fat JAR dan harus memiliki provided ruang lingkup dalam POM file atau secara eksplisit dikecualikan dalam konfigurasi. maven-shade-plugin Salah satu dependensi ini yang termasuk dalam fat JAR diabaikan saat runtime, tetapi meningkatkan ukuran JAR penambahan overhead selama penerapan.

Dependensi disediakan oleh runtime, dalam runtime versi 1.18, 1.19, dan 1.20:

  • org.apache.flink:flink-core

  • org.apache.flink:flink-java

  • org.apache.flink:flink-streaming-java

  • org.apache.flink:flink-scala_2.12

  • org.apache.flink:flink-table-runtime

  • org.apache.flink:flink-table-planner-loader

  • org.apache.flink:flink-json

  • org.apache.flink:flink-connector-base

  • org.apache.flink:flink-connector-files

  • org.apache.flink:flink-clients

  • org.apache.flink:flink-runtime-web

  • org.apache.flink:flink-metrics-code

  • org.apache.flink:flink-table-api-java

  • org.apache.flink:flink-table-api-bridge-base

  • org.apache.flink:flink-table-api-java-bridge

  • org.apache.logging.log4j:log4j-slf4j-impl

  • org.apache.logging.log4j:log4j-api

  • org.apache.logging.log4j:log4j-core

  • org.apache.logging.log4j:log4j-1.2-api

Selain itucom.amazonaws:aws-kinesisanalytics-runtime:1.2.0, perpustakaan yang digunakan untuk mengambil properti runtime aplikasi di Managed Service for Apache Flink juga disediakan.

Semua dependensi yang disediakan oleh runtime harus menggunakan rekomendasi berikut untuk tidak memasukkannya ke dalam uber: JAR

  • Di Maven (pom.xml) dan SBT (build.sbt), gunakan provided ruang lingkup.

  • Di Gradle (build.gradle), gunakan compileOnly konfigurasi.

Ketergantungan apa pun yang disediakan secara tidak sengaja disertakan dalam uber JAR akan diabaikan saat runtime karena pemuatan kelas induk-pertama Apache Flink. Untuk informasi lebih lanjut, lihat parent-first-patternsdi dokumentasi Apache Flink.

Konektor

Sebagian besar konektor, kecuali FileSystem konektor, yang tidak termasuk dalam runtime harus disertakan dalam POM file dengan cakupan default (compile).

Rekomendasi lainnya

Sebagai aturan, uber Apache Flink Anda yang JAR disediakan untuk Managed Service for Apache Flink harus berisi kode minimum yang diperlukan untuk menjalankan aplikasi. Menyertakan dependensi yang menyertakan kelas sumber, kumpulan data pengujian, atau status bootstrap tidak boleh disertakan dalam toples ini. Jika sumber daya statis perlu ditarik saat runtime, pisahkan masalah ini menjadi sumber daya seperti Amazon S3. Contohnya termasuk bootstraps status atau model inferensi.

Luangkan waktu untuk mempertimbangkan pohon ketergantungan mendalam Anda dan hapus dependensi non-runtime.

Meskipun Managed Service untuk Apache Flink mendukung ukuran jar 512MB, ini harus dilihat sebagai pengecualian aturan. Apache Flink saat ini mendukung ukuran jar ~ 104MB melalui konfigurasi defaultnya, dan itu harus menjadi ukuran target maksimum dari toples yang dibutuhkan.

Toleransi kesalahan: titik pemeriksaan dan titik simpan

Gunakan pos pemeriksaan dan savepoint untuk menerapkan toleransi kesalahan dalam Layanan Terkelola untuk aplikasi Apache Flink Anda. Ingat hal berikut saat mengembangkan dan memelihara aplikasi Anda:

  • Kami menyarankan agar Anda tetap mengaktifkan checkpointing untuk aplikasi Anda. Checkpointing memberikan toleransi kesalahan untuk aplikasi Anda selama pemeliharaan terjadwal, dan dalam kasus kegagalan tak terduga karena masalah layanan, kegagalan dependensi aplikasi, dan masalah lainnya. Untuk informasi tentang pemeliharaan terjadwal, lihat Mengelola tugas pemeliharaan untuk Managed Service untuk Apache Flink.

  • Set ApplicationSnapshotConfiguration:: SnapshotsEnabled ke false selama pengembangan aplikasi atau pemecahan masalah. Snapshot dibuat selama setiap aplikasi berhenti, yang dapat menyebabkan masalah jika aplikasi dalam keadaan tidak sehat atau tidak berkinerja. Atur SnapshotsEnabled ke true setelah aplikasi dalam produksi dan stabil.

    catatan

    Sebaiknya aplikasi Anda membuat snapshot beberapa kali sehari untuk memulai ulang dengan benar menggunakan data status yang benar. Frekuensi yang benar untuk snapshot Anda bergantung pada logika bisnis aplikasi Anda. Sering mengambil snapshot memungkinkan Anda memulihkan data yang lebih baru, tetapi meningkatkan biaya dan membutuhkan lebih banyak sumber daya sistem.

    Untuk informasi tentang pemantauan waktu henti aplikasi, lihat .

Untuk informasi selengkapnya tentang penerapan toleransi kegagalan, lihat Menerapkan toleransi kesalahan.

Versi konektor yang tidak didukung

Dari Apache Flink versi 1.15 atau yang lebih baru, Managed Service for Apache Flink secara otomatis mencegah aplikasi memulai atau memperbarui jika mereka menggunakan versi konektor Kinesis yang tidak didukung yang dibundel ke dalam aplikasi. JARs Saat memutakhirkan ke Managed Service untuk Apache Flink versi 1.15 atau yang lebih baru, pastikan Anda menggunakan konektor Kinesis terbaru. Ini adalah versi apa pun yang sama dengan atau lebih baru dari versi 1.15.2. Semua versi lain tidak didukung oleh Managed Service untuk Apache Flink karena mereka dapat menyebabkan masalah konsistensi atau kegagalan dengan fitur Stop with Savepoint, mencegah operasi berhenti/pembaruan bersih. Untuk mempelajari lebih lanjut tentang kompatibilitas konektor di Amazon Managed Service untuk versi Apache Flink, lihat konektor Apache Flink.

Performa dan paralelisme

Aplikasi Anda dapat diskalakan untuk memenuhi tingkat throughput apa pun dengan menyetel paralelisme aplikasi Anda, dan menghindari perangkap performa. Ingat hal berikut saat mengembangkan dan memelihara aplikasi Anda:

  • Verifikasi bahwa semua sumber aplikasi dan sink Anda ditetapkan dengan cukup dan tidak dibatasi. Jika sumber dan wastafel adalah AWS layanan lain, pantau layanan tersebut menggunakan CloudWatch.

  • Untuk aplikasi dengan paralelisme yang sangat tinggi, periksa apakah tingkat paralelisme yang tinggi diterapkan pada semua operator dalam aplikasi. Secara default, Apache Flink menerapkan paralelisme aplikasi yang sama untuk semua operator dalam grafik aplikasi. Ini dapat menyebabkan masalah penyediaan pada sumber atau sink, atau pun hambatan dalam pemrosesan data operator. Anda dapat mengubah paralelisme setiap operator dalam kode dengan. setParallelism

  • Pahami arti pengaturan paralelisme untuk operatori dalam aplikasi Anda. Jika Anda mengubah paralelisme untuk operator, Anda mungkin tidak dapat memulihkan aplikasi dari snapshot yang dibuat ketika operator memiliki paralelisme yang tidak kompatibel dengan pengaturan saat ini. Untuk informasi selengkapnya tentang pengaturan paralelisme operator, lihat Mengatur paralelisme maksimum untuk operator secara eksplisit.

Untuk informasi selengkapnya tentang penerapan penskalaan, lihat Menerapkan penskalaan aplikasi.

Pengaturan paralelisme per operator

Secara default, semua operator memiliki paralelisme yang ditetapkan pada tingkat aplikasi. Anda dapat mengganti paralelisme dari satu operator menggunakan using. DataStream API .setParallelism(x) Anda dapat mengatur paralelisme operator ke paralelisme apa pun yang sama atau lebih rendah dari paralelisme aplikasi.

Jika memungkinkan, tentukan paralelisme operator sebagai fungsi dari paralelisme aplikasi. Dengan cara ini, paralelisme operator akan bervariasi dengan paralelisme aplikasi. Jika Anda menggunakan penskalaan otomatis, misalnya, semua operator akan memvariasikan paralelisme mereka dalam proporsi yang sama:

int appParallelism = env.getParallelism(); ... ...ops.setParalleism(appParallelism/2);

Dalam beberapa kasus, Anda mungkin ingin mengatur paralelisme operator ke konstanta. Misalnya, mengatur paralelisme sumber Aliran Kinesis ke jumlah pecahan. Dalam kasus ini, Anda harus mempertimbangkan untuk meneruskan paralelisme operator sebagai parameter konfigurasi aplikasi, untuk mengubahnya tanpa mengubah kode, jika Anda perlu, misalnya, untuk mengubah aliran sumber.

Pencatatan log

Anda dapat memantau kinerja dan kondisi kesalahan aplikasi Anda menggunakan CloudWatch Log. Ingat hal berikut saat mengonfigurasi pencatatan untuk aplikasi Anda:

  • Aktifkan CloudWatch pencatatan untuk aplikasi sehingga masalah runtime apa pun dapat di-debug.

  • Jangan buat entri log untuk setiap catatan yang diproses dalam aplikasi. Ini menyebabkan hambatan parah selama pemrosesan dan dapat menyebabkan tekanan balik dalam pemrosesan data.

  • Buat CloudWatch alarm untuk memberi tahu Anda ketika aplikasi Anda tidak berjalan dengan benar. Untuk informasi selengkapnya, lihat

Untuk informasi selengkapnya tentang penerapan pencatatan, lihat .

Pengkodean

Anda dapat membuat aplikasi Anda berfungsi dan stabil menggunakan praktik pemrograman yang direkomendasikan. Ingat hal berikut saat menulis kode aplikasi:

  • Jangan gunakan system.exit() dalam kode aplikasi Anda, baik dalam metode main aplikasi Anda atau dalam fungsi yang ditetapkan pengguna. Jika Anda ingin menonaktifkan aplikasi Anda dari dalam kode, lempar pengecualian yang berasal dari Exception atau RuntimeException, yang berisi pesan tentang apa yang salah dengan aplikasi.

    Catat hal berikut tentang bagaimana layanan menangani pengecualian ini:

    • Jika pengecualian dilemparkan dari metode main aplikasi Anda, layanan akan membungkusnya dalam ProgramInvocationException saat transisi aplikasi ke status RUNNING, dan manajer tugas akan gagal mengirimkan tugas.

    • Jika pengecualian dilemparkan dari fungsi yang ditetapkan pengguna, manajer tugas akan gagal tugas dan memulai ulang, serta detail pengecualian akan ditulis ke log pengecualian.

  • Pertimbangkan untuk menaungi JAR file aplikasi Anda dan dependensi yang disertakan. Bayangan direkomendasikan ketika ada potensi konflik dalam nama paket antara aplikasi Anda dan runtime Apache Flink. Jika terjadi konflik, log aplikasi Anda mungkin berisi pengecualian tipe java.util.concurrent.ExecutionException. Untuk informasi selengkapnya tentang shading JAR file aplikasi Anda, lihat Apache Maven Shade Plugin.

Mengelola kredensi

Anda tidak boleh memanggang kredensi jangka panjang apa pun ke dalam aplikasi produksi (atau lainnya). Kredensi jangka panjang kemungkinan diperiksa ke dalam sistem kontrol versi dan dapat dengan mudah hilang. Sebagai gantinya, Anda dapat mengaitkan peran ke Layanan Terkelola untuk aplikasi Apache Flink dan memberikan hak istimewa untuk peran tersebut. Aplikasi Flink yang sedang berjalan kemudian dapat mengambil kredensil sementara dengan hak istimewa masing-masing dari lingkungan. Jika otentikasi diperlukan untuk layanan yang tidak terintegrasi secara native denganIAM, misalnya, database yang memerlukan nama pengguna dan kata sandi untuk otentikasi, Anda harus mempertimbangkan untuk menyimpan rahasia di Secrets Manager AWS .

Banyak layanan AWS asli mendukung otentikasi:

Membaca dari sumber dengan sedikit pecahan/partisi

Saat membaca dari Apache Kafka atau Aliran Data Kinesis, mungkin ada ketidakcocokan antara paralelisme aliran (yaitu, jumlah partisi untuk Kafka dan jumlah pecahan untuk Kinesis) dan paralelisme aplikasi. Dengan desain yang naif, paralelisme aplikasi tidak dapat berskala melampaui paralelisme aliran: Setiap subtugas operator sumber hanya dapat membaca dari 1 atau lebih piringan/partisi. Itu berarti untuk aliran dengan hanya 2 pecahan dan aplikasi dengan paralelisme 8, bahwa hanya dua subtugas yang benar-benar memakan dari aliran dan 6 subtugas tetap menganggur. Ini secara substansional dapat membatasi throughput aplikasi, khususnya jika deserialisasi mahal dan dilakukan oleh sumber (yang merupakan default).

Untuk mengurangi efek ini, Anda dapat menskalakan aliran. Tapi itu mungkin tidak selalu diinginkan atau mungkin. Atau, Anda dapat merestrukturisasi sumber sehingga tidak melakukan serialisasi apa pun dan hanya meneruskan. byte[] Anda kemudian dapat menyeimbangkan kembali data untuk mendistribusikannya secara merata di semua tugas dan kemudian deserialisasi data di sana. Dengan cara ini, Anda dapat memanfaatkan semua subtugas untuk deserialisasi dan operasi yang berpotensi mahal ini tidak lagi terikat oleh jumlah shard/partisi aliran.

Interval refresh notebook Studio

Jika Anda mengubah interval refresh hasil paragraf, atur ke nilai yang setidaknya 1000 milidetik.

Performa optimum notebook Studio

Kami menguji dengan pernyataan berikut dan mendapat performa terbaik saat events-per-second yang dikalikan dengan number-of-keys berada di bawah 25.000.000. Ini adalah untuk events-per-second di bawah 150.000.

SELECT key, sum(value) FROM key-values GROUP BY key

Bagaimana strategi watermark dan pecahan idle memengaruhi jendela waktu

Saat membaca peristiwa dari Apache Kafka dan Kinesis Data Streams, sumber dapat mengatur waktu acara berdasarkan atribut aliran. Dalam kasus Kinesis, waktu acara sama dengan perkiraan waktu kedatangan peristiwa. Tetapi pengaturan waktu acara di sumber untuk acara tidak cukup bagi aplikasi Flink untuk menggunakan waktu acara. Sumber juga harus menghasilkan tanda air yang menyebarkan informasi tentang waktu acara dari sumber ke semua operator lain. Dokumentasi Flink memiliki gambaran yang baik tentang cara kerja proses itu.

Secara default, stempel waktu peristiwa yang dibaca dari Kinesis diatur ke perkiraan waktu kedatangan yang ditentukan oleh Kinesis. Prasyarat tambahan untuk waktu acara untuk bekerja dalam aplikasi adalah strategi watermark.

WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(...));

Strategi watermark kemudian diterapkan ke a DataStream with the assignTimestampsAndWatermarks method. Ada beberapa strategi build-in yang berguna:

  • forMonotonousTimestamps()hanya akan menggunakan waktu acara (perkiraan waktu kedatangan) dan secara berkala memancarkan nilai maksimum sebagai tanda air (untuk setiap subtugas tertentu)

  • forBoundedOutOfOrderness(Duration.ofSeconds(...))mirip dengan strategi sebelumnya, tetapi akan menggunakan waktu acara - durasi untuk pembuatan tanda air.

Ini berhasil, tetapi ada beberapa peringatan yang harus diperhatikan. Tanda air dihasilkan pada tingkat subtugas dan mengalir melalui grafik operator.

Dari dokumentasi Flink:

Setiap subtugas paralel dari fungsi sumber biasanya menghasilkan tanda airnya secara independen. Tanda air ini menentukan waktu acara pada sumber paralel tertentu.

Saat tanda air mengalir melalui program streaming, mereka memajukan waktu acara di operator tempat mereka tiba. Setiap kali operator memajukan waktu acaranya, ia menghasilkan tanda air baru di hilir untuk operator penggantinya.

Beberapa operator menggunakan beberapa aliran input; serikat pekerja, misalnya, atau operator yang mengikuti fungsi keyBy (...) atau partisi (...). Waktu kejadian operator saat ini adalah minimum waktu acara aliran inputnya. Karena aliran inputnya memperbarui waktu acara mereka, begitu juga operator.

Itu berarti, jika subtugas sumber mengkonsumsi dari pecahan siaga, operator hilir tidak menerima tanda air baru dari subtugas itu dan karenanya memproses stall untuk semua operator hilir yang menggunakan jendela waktu. Untuk menghindari hal ini, pelanggan dapat menambahkan withIdleness opsi ke strategi tanda air. Dengan opsi itu, operator mengecualikan tanda air dari subtugas upsteam idle saat menghitung waktu acara operator. Subtugas idle karenanya tidak lagi memblokir kemajuan waktu acara di operator hilir.

Namun, opsi kemalasan dengan strategi tanda air bawaan tidak akan memajukan waktu acara jika tidak ada subtugas yang membaca acara apa pun, yaitu, tidak ada acara dalam aliran. Ini menjadi sangat terlihat untuk kasus uji di mana serangkaian peristiwa terbatas dibaca dari aliran. Karena waktu acara tidak berlanjut setelah acara terakhir dibaca, jendela terakhir (berisi acara terakhir) tidak akan pernah ditutup.

Ringkasan

  • withIdlenesspengaturan tidak akan menghasilkan tanda air baru jika pecahan menganggur, itu hanya akan mengecualikan tanda air terakhir yang dikirim oleh subtugas idle dari perhitungan tanda air min di operator hilir

  • dengan strategi tanda air bawaan, jendela terbuka terakhir tidak akan pernah ditutup (kecuali acara baru yang memajukan tanda air akan dikirim, tetapi itu menciptakan jendela baru yang kemudian tetap terbuka)

  • bahkan ketika waktu diatur oleh aliran Kinesis, peristiwa kedatangan terlambat masih dapat terjadi jika satu pecahan dikonsumsi lebih cepat daripada yang lain (misalnya, selama inisialisasi aplikasi atau saat menggunakan TRIM_HORIZON di mana semua pecahan yang ada dikonsumsi secara paralel mengabaikan hubungan orang tua/anak mereka)

  • withIdlenesspengaturan strategi tanda air tampaknya menghentikan pengaturan khusus sumber Kinesis untuk pecahan siaga (ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS

Contoh

Aplikasi berikut membaca dari aliran dan membuat jendela sesi berdasarkan waktu acara.

Properties consumerConfig = new Properties(); consumerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON"); FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("...", new SimpleStringSchema(), consumerConfig); WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(15)); env.addSource(consumer) .assignTimestampsAndWatermarks(s) .map(new MapFunction<String, Long>() { @Override public Long map(String s) throws Exception { return Long.parseLong(s); } }) .keyBy(l -> 0l) .window(EventTimeSessionWindows.withGap(Time.seconds(10))) .process(new ProcessWindowFunction<Long, Object, Long, TimeWindow>() { @Override public void process(Long aLong, ProcessWindowFunction<Long, Object, Long, TimeWindow>.Context context, Iterable<Long>iterable, Collector<Object> collector) throws Exception { long count = StreamSupport.stream(iterable.spliterator(), false).count(); long timestamp = context.currentWatermark(); System.out.print("XXXXXXXXXXXXXX Window with " + count + " events"); System.out.println("; Watermark: " + timestamp + ", " + Instant.ofEpochMilli(timestamp)); for (Long l : iterable) { System.out.println(l); } } });

Dalam contoh berikut, 8 peristiwa ditulis ke aliran pecahan 16 (2 yang pertama dan peristiwa terakhir kebetulan mendarat di pecahan yang sama).

$ aws kinesis put-record --stream-name hp-16 --partition-key 1 --data MQ== $ aws kinesis put-record --stream-name hp-16 --partition-key 2 --data Mg== $ aws kinesis put-record --stream-name hp-16 --partition-key 3 --data Mw== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028721934184977530127978070210" } { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028795678659974022576354623682" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275134360684221592378842022114" } Wed Mar 23 11:19:57 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 4 --data NA== $ aws kinesis put-record --stream-name hp-16 --partition-key 5 --data NQ== $ date { "ShardId": "shardId-000000000010", "SequenceNumber": "49627894338570054070103749783042116732419934393936642210" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275659034489934342334017700066" } Wed Mar 23 11:20:10 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 6 --data Ng== $ date { "ShardId": "shardId-000000000001", "SequenceNumber": "49627894338369347363316974173886988345467035365375213586" } Wed Mar 23 11:20:22 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 7 --data Nw== $ date { "ShardId": "shardId-000000000008", "SequenceNumber": "49627894338525452579706688535878947299195189349725503618" } Wed Mar 23 11:20:34 CET 2022 $ sleep 60 $ aws kinesis put-record --stream-name hp-16 --partition-key 8 --data OA== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811029600823255837371928900796610" } Wed Mar 23 11:21:27 CET 2022

Masukan ini akan menghasilkan jendela sesi 5: event 1,2,3; event 4,5; event 6; event 7; event 8. Namun, program ini hanya menghasilkan 4 jendela pertama.

11:59:21,529 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 5 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 5 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,531 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 4 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 4 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:23,209 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,244 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 event: 6; timestamp: 1648030822428, 2022-03-23T10:20:22.428Z 11:59:23,377 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,405 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,581 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,586 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:24,790 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 4; timestamp: 1648030809282, 2022-03-23T10:20:09.282Z event: 3; timestamp: 1648030797697, 2022-03-23T10:19:57.697Z event: 5; timestamp: 1648030810871, 2022-03-23T10:20:10.871Z 11:59:24,907 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 7; timestamp: 1648030834105, 2022-03-23T10:20:34.105Z event: 1; timestamp: 1648030794441, 2022-03-23T10:19:54.441Z event: 2; timestamp: 1648030796122, 2022-03-23T10:19:56.122Z event: 8; timestamp: 1648030887171, 2022-03-23T10:21:27.171Z XXXXXXXXXXXXXX Window with 3 events; Watermark: 1648030809281, 2022-03-23T10:20:09.281Z 3 1 2 XXXXXXXXXXXXXX Window with 2 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 4 5 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 6 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030887170, 2022-03-23T10:21:27.170Z 7

Outputnya hanya menampilkan 4 jendela (tidak ada jendela terakhir yang berisi acara 8). Ini karena waktu acara dan strategi tanda air. Jendela terakhir tidak dapat ditutup karena dengan strategi watermark per built waktu tidak pernah maju melampaui waktu peristiwa terakhir yang telah dibaca dari aliran. Tetapi agar jendela ditutup, waktu perlu maju lebih dari 10 detik setelah acara terakhir. Dalam hal ini tanda air terakhir adalah 2022-03-23T 10:21:27.170 Z tetapi agar jendela sesi ditutup, diperlukan tanda air 10 detik dan 1 ms kemudian.

Jika withIdleness opsi dihapus dari strategi tanda air, tidak ada jendela sesi yang akan ditutup, karena “tanda air global” dari operator jendela tidak dapat maju.

Perhatikan bahwa ketika aplikasi Flink dimulai (atau jika ada kemiringan data), beberapa pecahan dapat dikonsumsi lebih cepat daripada yang lain. Hal ini dapat menyebabkan beberapa tanda air dipancarkan terlalu dini dari subtugas (subtugas dapat memancarkan tanda air berdasarkan konten satu pecahan tanpa dikonsumsi dari pecahan lain yang dilangganannya). Cara untuk mengurangi adalah strategi watermarking yang berbeda yang menambahkan buffer keamanan (forBoundedOutOfOrderness(Duration.ofSeconds(30)) atau secara eksplisit memungkinkan acara kedatangan terlambat. (allowedLateness(Time.minutes(5))

Tetapkan a UUID untuk semua operator

Ketika Layanan Terkelola untuk Apache Flink memulai pekerjaan Flink untuk aplikasi dengan snapshot, pekerjaan Flink dapat gagal dimulai karena masalah tertentu. Salah satunya adalah ketidakcocokan ID operator. Flink mengharapkan operator eksplisit dan konsisten IDs untuk operator grafik pekerjaan Flink. Jika tidak disetel secara eksplisit, Flink otomatis membuat ID untuk operator. Ini karena Flink menggunakan operator ini IDs untuk mengidentifikasi operator secara unik dalam grafik pekerjaan dan menggunakannya untuk menyimpan status setiap operator di savepoint.

Masalah ketidakcocokan ID operator terjadi ketika Flink tidak menemukan pemetaan 1:1 antara operator grafik pekerjaan dan operator IDs yang IDs ditentukan dalam savepoint. Ini terjadi ketika operator konsisten eksplisit tidak IDs disetel dan Flink otomatis menghasilkan operator IDs yang mungkin tidak konsisten dengan setiap pembuatan grafik pekerjaan. Kemungkinan aplikasi mengalami masalah ini tinggi selama pemeliharaan berjalan. Untuk menghindari hal ini, kami menyarankan pelanggan mengatur UUID semua operator dalam kode flink. Untuk informasi selengkapnya, lihat topik Menetapkan UUID untuk semua operator di bawah Kesiapan produksi.

Tambahkan ServiceResourceTransformer ke plugin Maven shade

Flink menggunakan Java Service Provider Interfaces (SPI) untuk memuat komponen seperti konektor dan format. Beberapa dependensi Flink menggunakan SPI dapat menyebabkan bentrokan di uber-jar dan perilaku aplikasi yang tidak terduga. Disarankan untuk menambahkan plugin naungan Maven, yang didefinisikan dalam pom.xml ServiceResourceTransformer

<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <id>shade</id> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers combine.children="append"> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- ... --> </transformers> </configuration> </execution> </executions> </plugin>
PrivasiSyarat situsPreferensi cookie
© 2025, Amazon Web Services, Inc. atau afiliasinya. Semua hak dilindungi undang-undang.