Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.
Autotuning parameter Autoscaler
Bagian ini menjelaskan perilaku penyetelan otomatis untuk berbagai versi EMR Amazon. Ini juga masuk ke detail mengenai konfigurasi auto-scaling yang berbeda.
catatan
Amazon EMR 7.2.0 dan yang lebih tinggi menggunakan konfigurasi open source job.autoscaler.restart.time-tracking.enabled untuk mengaktifkan estimasi waktu penskalaan ulang. Estimasi waktu skala ulang memiliki fungsionalitas yang sama dengan autotuning Amazon EMR, jadi Anda tidak perlu menetapkan nilai empiris secara manual ke waktu restart.
Anda masih dapat menggunakan autotuning Amazon EMR jika Anda menggunakan Amazon EMR 7.1.0 atau lebih rendah.
- 7.2.0 and higher
-
Amazon EMR 7.2.0 dan yang lebih tinggi mengukur waktu restart aktual yang diperlukan untuk menerapkan keputusan penskalaan otomatis. Dalam rilis 7.1.0 dan yang lebih rendah, Anda harus menggunakan konfigurasi
job.autoscaler.restart.timeuntuk mengonfigurasi perkiraan waktu restart maksimum secara manual. Dengan menggunakan konfigurasijob.autoscaler.restart.time-tracking.enabled, Anda hanya perlu memasukkan waktu restart untuk penskalaan pertama. Setelah itu, operator mencatat waktu restart aktual dan akan menggunakannya untuk penskalaan berikutnya.Untuk mengaktifkan pelacakan ini, gunakan perintah berikut:
job.autoscaler.restart.time-tracking.enabled: trueBerikut ini adalah konfigurasi terkait untuk estimasi waktu penskalaan ulang.
Konfigurasi Diperlukan Default Deskripsi job.autoscaler.restart.time-tracking.enabled Tidak False Menunjukkan apakah Flink Autoscaler harus secara otomatis menyetel konfigurasi dari waktu ke waktu untuk mengoptimalkan dessisi penskalaan. Perhatikan bahwa Autoscaler hanya dapat melakukan autotune parameter Autoscaler. restart.timejob.autoscaler.restart.time Tidak 5m Waktu restart yang diharapkan yang digunakan Amazon EMR di EKS hingga operator dapat menentukan waktu restart aktual dari penskalaan sebelumnya. job.autoscaler.restart.time-tracking.limit Tidak 15m Waktu restart maksimum yang diamati saat job.autoscaler.restart.time-tracking.enableddiatur ketrue.Berikut ini adalah contoh spesifikasi penerapan yang dapat Anda gunakan untuk mencoba estimasi waktu penskalaan ulang:
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: autoscaling-example spec: flinkVersion: v1_18 flinkConfiguration: # Autoscaler parameters job.autoscaler.enabled: "true" job.autoscaler.scaling.enabled: "true" job.autoscaler.stabilization.interval: "5s" job.autoscaler.metrics.window: "1m" job.autoscaler.restart.time-tracking.enabled: "true" job.autoscaler.restart.time: "2m" job.autoscaler.restart.time-tracking.limit: "10m" jobmanager.scheduler: adaptive taskmanager.numberOfTaskSlots: "1" pipeline.max-parallelism: "12" executionRoleArn:<JOB ARN>emrReleaseLabel: emr-7.12.0-flink-latest jobManager: highAvailabilityEnabled: false storageDir: s3://<s3_bucket>/flink/autoscaling/ha/ replicas: 1 resource: memory: "1024m" cpu: 0.5 taskManager: resource: memory: "1024m" cpu: 0.5 job: jarURI: s3://<s3_bucket>/some-job-with-back-pressure parallelism: 1 upgradeMode: statelessUntuk mensimulasikan tekanan balik, gunakan spesifikasi penerapan berikut.
job: jarURI: s3://<s3_bucket>/pyflink-script.py entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-py", "/opt/flink/usrlib/pyflink-script.py"] parallelism: 1 upgradeMode: statelessUnggah skrip Python berikut ke bucket S3 Anda.
import logging import sys import time import random from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment TABLE_NAME="orders" QUERY=f""" CREATE TABLE {TABLE_NAME} ( id INT, order_time AS CURRENT_TIMESTAMP, WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS ) WITH ( 'connector' = 'datagen', 'rows-per-second'='10', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='100' ); """ def create_backpressure(i): time.sleep(2) return i def autoscaling_demo(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.execute_sql(QUERY) res_table = t_env.from_path(TABLE_NAME) stream = t_env.to_data_stream(res_table) \ .shuffle().map(lambda x: create_backpressure(x))\ .print() env.execute("Autoscaling demo") if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") autoscaling_demo()Untuk memverifikasi bahwa estimasi waktu penskalaan ulang berfungsi, pastikan pencatatan
DEBUGlevel operator Flink diaktifkan. Contoh di bawah ini menunjukkan cara memperbarui file bagan helm.values.yamlKemudian instal ulang bagan helm yang diperbarui dan jalankan pekerjaan Flink Anda lagi.log4j-operator.properties: |+ # Flink Operator Logging Overrides rootLogger.level = DEBUGDapatkan nama pod pemimpin Anda.
ip=$(kubectl get configmap -n $NAMESPACE<job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"Jalankan perintah berikut untuk mendapatkan waktu restart aktual yang digunakan dalam evaluasi metrik.
kubectl logs<FLINK-OPERATOR-POD-NAME>-c flink-kubernetes-operator -n<OPERATOR-NAMESPACE>-f | grep "Restart time used in scaling summary computation"Anda akan melihat log yang mirip dengan yang berikut ini. Perhatikan bahwa hanya penskalaan pertama yang digunakan
job.autoscaler.restart.time. Penskalaan selanjutnya menggunakan waktu restart yang diamati.2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M 2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S 2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S 2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S 2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S - 7.0.0 and 7.1.0
-
Flink Autoscaler bawaan open source menggunakan banyak metrik untuk membuat keputusan penskalaan terbaik. Namun, nilai default yang digunakan untuk perhitungannya dimaksudkan untuk dapat diterapkan pada sebagian besar beban kerja dan mungkin tidak optimal untuk pekerjaan tertentu. Fitur autotuning yang ditambahkan ke EMR Amazon pada versi EKS dari Operator Flink melihat tren historis yang diamati pada metrik tertentu yang ditangkap dan kemudian mencoba menghitung nilai paling optimal yang disesuaikan untuk pekerjaan yang diberikan.
Konfigurasi Diperlukan Default Deskripsi kubernetes.operator.job.autoscaler.autotune.enable Tidak False Menunjukkan apakah Flink Autoscaler harus secara otomatis menyetel konfigurasi dari waktu ke waktu untuk mengoptimalkan dessisi penskalaan penskalaan otomatis. Saat ini, Autoscaler hanya dapat melakukan autotune parameter Autoscaler. restart.timekubernetes.operator.job.autoscaler.autotune.metrics.history.max.count Tidak 3 Menunjukkan berapa banyak historis Amazon EMR pada metrik EKS yang disimpan Autoscaler di Amazon EMR di peta konfigurasi metrik EKS. kubernetes.operator.job.autoscaler.autotune.metrics.restart.count Tidak 3 Menunjukkan berapa banyak jumlah restart yang dilakukan Autoscaler sebelum mulai menghitung waktu restart rata-rata untuk pekerjaan tertentu. Untuk mengaktifkan autotuning, Anda harus menyelesaikan yang berikut ini:
-
Setel
kubernetes.operator.job.autoscaler.autotune.enable:ketrue -
Setel
metrics.job.status.enable:keTOTAL_TIME -
Mengikuti pengaturan Menggunakan Autoscaler untuk aplikasi Flink untuk mengaktifkan Autoscaling.
Berikut ini adalah contoh spesifikasi penerapan yang dapat Anda gunakan untuk mencoba autotuning.
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: autoscaling-example spec: flinkVersion: v1_18 flinkConfiguration: # Autotuning parameters kubernetes.operator.job.autoscaler.autotune.enable: "true" kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2" kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1" metrics.job.status.enable: TOTAL_TIME # Autoscaler parameters kubernetes.operator.job.autoscaler.enabled: "true" kubernetes.operator.job.autoscaler.scaling.enabled: "true" kubernetes.operator.job.autoscaler.stabilization.interval: "5s" kubernetes.operator.job.autoscaler.metrics.window: "1m" jobmanager.scheduler: adaptive taskmanager.numberOfTaskSlots: "1" state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/ state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/ pipeline.max-parallelism: "4" executionRoleArn: <JOB ARN> emrReleaseLabel: emr-6.14.0-flink-latest jobManager: highAvailabilityEnabled: true storageDir: s3://<S3_bucket>/flink/autoscaling/ha/ replicas: 1 resource: memory: "1024m" cpu: 0.5 taskManager: resource: memory: "1024m" cpu: 0.5 job: jarURI: s3://<S3_bucket>/some-job-with-back-pressure parallelism: 1 upgradeMode: last-stateUntuk mensimulasikan tekanan balik, gunakan spesifikasi penerapan berikut.
job: jarURI: s3://<S3_bucket>/pyflink-script.py entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-py", "/opt/flink/usrlib/pyflink-script.py"] parallelism: 1 upgradeMode: last-stateUnggah skrip Python berikut ke bucket S3 Anda.
import logging import sys import time import random from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment TABLE_NAME="orders" QUERY=f""" CREATE TABLE {TABLE_NAME} ( id INT, order_time AS CURRENT_TIMESTAMP, WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS ) WITH ( 'connector' = 'datagen', 'rows-per-second'='10', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='100' ); """ def create_backpressure(i): time.sleep(2) return i def autoscaling_demo(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.execute_sql(QUERY) res_table = t_env.from_path(TABLE_NAME) stream = t_env.to_data_stream(res_table) \ .shuffle().map(lambda x: create_backpressure(x))\ .print() env.execute("Autoscaling demo") if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") autoscaling_demo()Untuk memverifikasi bahwa autotuner Anda berfungsi, gunakan perintah berikut. Perhatikan bahwa Anda harus menggunakan informasi pod pemimpin Anda sendiri untuk Operator Flink.
Pertama dapatkan nama pod pemimpin Anda.
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"Setelah Anda memiliki nama pod pemimpin Anda, Anda dapat menjalankan perintah berikut.
kubectl logs -n $NAMESPACE -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME> | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'Anda akan melihat log yang mirip dengan yang berikut ini.
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1)) [m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S -