Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Autotuning dei parametri Autoscaler
Questa sezione descrive il comportamento di ottimizzazione automatica per varie versioni di Amazon EMR. Inoltre, approfondisce le diverse configurazioni di auto-scaling.
Nota
Amazon EMR 7.2.0 e versioni successive utilizzano la configurazione open source job.autoscaler.restart.time-tracking.enabled per consentire la stima del tempo di ridimensionamento. La stima del tempo di riscala ha le stesse funzionalità dell'autotuning di Amazon EMR, quindi non è necessario assegnare manualmente valori empirici al tempo di riavvio.
Puoi comunque utilizzare l'autotuning di Amazon EMR se utilizzi Amazon EMR 7.1.0 o versioni precedenti.
- 7.2.0 and higher
-
Amazon EMR 7.2.0 e versioni successive misurano il tempo di riavvio effettivo richiesto per applicare le decisioni di scalabilità automatica. Nelle versioni 7.1.0 e precedenti, era necessario utilizzare la configurazione per configurare manualmente il tempo massimo di
job.autoscaler.restart.timeriavvio stimato. Utilizzando la configurazionejob.autoscaler.restart.time-tracking.enabled, è sufficiente inserire un orario di riavvio per il primo ridimensionamento. Successivamente, l'operatore registra l'orario di riavvio effettivo e lo utilizzerà per i ridimensionamenti successivi.Per abilitare questo tracciamento, utilizzate il seguente comando:
job.autoscaler.restart.time-tracking.enabled: trueDi seguito sono riportate le configurazioni correlate per la stima del tempo di ridimensionamento.
Configurazione Obbligatorio Predefinita Description job.autoscaler.restart.time-tracking.enabled No False Indica se Flink Autoscaler deve ottimizzare automaticamente le configurazioni nel tempo per ottimizzare le decisioni di ridimensionamento. Nota che Autoscaler può solo regolare automaticamente il parametro Autoscaler. restart.timejob.autoscaler.restart.time No 5 min Il tempo di riavvio previsto utilizzato da Amazon EMR su EKS fino a quando l'operatore non sarà in grado di determinare il tempo di riavvio effettivo in base alle scalature precedenti. job.autoscaler.restart.time-tracking.limit No 15 min Il tempo di riavvio massimo osservato quando è impostato su. job.autoscaler.restart.time-tracking.enabledtrueDi seguito è riportato un esempio di specifica di distribuzione che è possibile utilizzare per provare la stima del tempo di ridimensionamento:
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: autoscaling-example spec: flinkVersion: v1_18 flinkConfiguration: # Autoscaler parameters job.autoscaler.enabled: "true" job.autoscaler.scaling.enabled: "true" job.autoscaler.stabilization.interval: "5s" job.autoscaler.metrics.window: "1m" job.autoscaler.restart.time-tracking.enabled: "true" job.autoscaler.restart.time: "2m" job.autoscaler.restart.time-tracking.limit: "10m" jobmanager.scheduler: adaptive taskmanager.numberOfTaskSlots: "1" pipeline.max-parallelism: "12" executionRoleArn:<JOB ARN>emrReleaseLabel: emr-7.12.0-flink-latest jobManager: highAvailabilityEnabled: false storageDir: s3://<s3_bucket>/flink/autoscaling/ha/ replicas: 1 resource: memory: "1024m" cpu: 0.5 taskManager: resource: memory: "1024m" cpu: 0.5 job: jarURI: s3://<s3_bucket>/some-job-with-back-pressure parallelism: 1 upgradeMode: statelessPer simulare la contropressione, utilizza le seguenti specifiche di distribuzione.
job: jarURI: s3://<s3_bucket>/pyflink-script.py entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-py", "/opt/flink/usrlib/pyflink-script.py"] parallelism: 1 upgradeMode: statelessCarica il seguente script Python nel tuo bucket S3.
import logging import sys import time import random from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment TABLE_NAME="orders" QUERY=f""" CREATE TABLE {TABLE_NAME} ( id INT, order_time AS CURRENT_TIMESTAMP, WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS ) WITH ( 'connector' = 'datagen', 'rows-per-second'='10', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='100' ); """ def create_backpressure(i): time.sleep(2) return i def autoscaling_demo(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.execute_sql(QUERY) res_table = t_env.from_path(TABLE_NAME) stream = t_env.to_data_stream(res_table) \ .shuffle().map(lambda x: create_backpressure(x))\ .print() env.execute("Autoscaling demo") if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") autoscaling_demo()Per verificare che la stima del tempo di ridimensionamento funzioni, assicurati che la registrazione dei
DEBUGlivelli dell'operatore Flink sia abilitata. L'esempio seguente mostra come aggiornare il file del grafico Helm.values.yamlQuindi reinstalla la tabella di comando aggiornata ed esegui nuovamente il job Flink.log4j-operator.properties: |+ # Flink Operator Logging Overrides rootLogger.level = DEBUGOttieni il nome del tuo leader pod.
ip=$(kubectl get configmap -n $NAMESPACE<job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"Esegui il comando seguente per ottenere il tempo di riavvio effettivo utilizzato nelle valutazioni delle metriche.
kubectl logs<FLINK-OPERATOR-POD-NAME>-c flink-kubernetes-operator -n<OPERATOR-NAMESPACE>-f | grep "Restart time used in scaling summary computation"Dovrebbero essere visualizzati log simili ai seguenti. Nota che viene utilizzato solo il primo ridimensionamento.
job.autoscaler.restart.timeI ridimensionamenti successivi utilizzano il tempo di riavvio osservato.2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M 2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S 2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S 2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S 2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S - 7.0.0 and 7.1.0
-
Il Flink Autoscaler open source integrato utilizza numerose metriche per prendere le migliori decisioni di scalabilità. Tuttavia, i valori predefiniti che utilizza per i suoi calcoli sono pensati per essere applicabili alla maggior parte dei carichi di lavoro e potrebbero non essere ottimali per un determinato lavoro. La funzionalità di autotuning aggiunta alla versione Amazon EMR on EKS di Flink Operator esamina le tendenze storiche osservate su specifiche metriche acquisite e quindi cerca di calcolare il valore più ottimale su misura per il determinato lavoro.
Configurazione Obbligatorio Predefinita Description kubernetes.operator.job.autoscaler.autotune.enable No False Indica se Flink Autoscaler deve ottimizzare automaticamente le configurazioni nel tempo per ottimizzare le decisioni di ridimensionamento degli autoscaler. Attualmente, Autoscaler può solo regolare automaticamente il parametro Autoscaler. restart.timekubernetes.operator.job.autoscaler.autotune.metrics.history.max.count No 3 Indica il numero di parametri storici di Amazon EMR su EKS che Autoscaler conserva nella mappa di configurazione dei parametri di Amazon EMR on EKS. kubernetes.operator.job.autoscaler.autotune.metrics.restart.count No 3 Indica il numero di riavvii che Autoscaler esegue prima di iniziare a calcolare il tempo di riavvio medio per un determinato lavoro. Per abilitare l'autotuning, è necessario aver completato quanto segue:
-
Imposta
kubernetes.operator.job.autoscaler.autotune.enable:sutrue -
Imposta
metrics.job.status.enable:suTOTAL_TIME -
È seguita la configurazione di Using Autoscaler for Flink applications per abilitare la scalabilità automatica.
Di seguito è riportato un esempio di specifica di distribuzione che è possibile utilizzare per provare l'autotuning.
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: autoscaling-example spec: flinkVersion: v1_18 flinkConfiguration: # Autotuning parameters kubernetes.operator.job.autoscaler.autotune.enable: "true" kubernetes.operator.job.autoscaler.autotune.metrics.history.max.count: "2" kubernetes.operator.job.autoscaler.autotune.metrics.restart.count: "1" metrics.job.status.enable: TOTAL_TIME # Autoscaler parameters kubernetes.operator.job.autoscaler.enabled: "true" kubernetes.operator.job.autoscaler.scaling.enabled: "true" kubernetes.operator.job.autoscaler.stabilization.interval: "5s" kubernetes.operator.job.autoscaler.metrics.window: "1m" jobmanager.scheduler: adaptive taskmanager.numberOfTaskSlots: "1" state.savepoints.dir: s3://<S3_bucket>/autoscaling/savepoint/ state.checkpoints.dir: s3://<S3_bucket>/flink/autoscaling/checkpoint/ pipeline.max-parallelism: "4" executionRoleArn: <JOB ARN> emrReleaseLabel: emr-6.14.0-flink-latest jobManager: highAvailabilityEnabled: true storageDir: s3://<S3_bucket>/flink/autoscaling/ha/ replicas: 1 resource: memory: "1024m" cpu: 0.5 taskManager: resource: memory: "1024m" cpu: 0.5 job: jarURI: s3://<S3_bucket>/some-job-with-back-pressure parallelism: 1 upgradeMode: last-statePer simulare la contropressione, utilizzate le seguenti specifiche di distribuzione.
job: jarURI: s3://<S3_bucket>/pyflink-script.py entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-py", "/opt/flink/usrlib/pyflink-script.py"] parallelism: 1 upgradeMode: last-stateCarica il seguente script Python nel tuo bucket S3.
import logging import sys import time import random from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment TABLE_NAME="orders" QUERY=f""" CREATE TABLE {TABLE_NAME} ( id INT, order_time AS CURRENT_TIMESTAMP, WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS ) WITH ( 'connector' = 'datagen', 'rows-per-second'='10', 'fields.id.kind'='random', 'fields.id.min'='1', 'fields.id.max'='100' ); """ def create_backpressure(i): time.sleep(2) return i def autoscaling_demo(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(env) t_env.execute_sql(QUERY) res_table = t_env.from_path(TABLE_NAME) stream = t_env.to_data_stream(res_table) \ .shuffle().map(lambda x: create_backpressure(x))\ .print() env.execute("Autoscaling demo") if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") autoscaling_demo()Per verificare che l'autotuner funzioni, usa i seguenti comandi. Nota che devi usare le informazioni del tuo leader pod per l'operatore Flink.
Per prima cosa procuratevi il nome del vostro leader pod.
ip=$(kubectl get configmap -n $NAMESPACE <job-name>-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}') kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"Una volta che hai il nome del tuo leader pod, puoi eseguire il seguente comando.
kubectl logs -n $NAMESPACE -c flink-kubernetes-operator --follow <YOUR-FLINK-OPERATOR-POD-NAME> | grep -E 'EmrEks|autotun|calculating|restart|autoscaler'Dovreste vedere dei log simili ai seguenti.
[m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [36m[DEBUG][flink/autoscaling-example] Using the latest Emr Eks Metric for calculating restart.time for autotuning: EmrEksMetrics(restartMetric=RestartMetric(restartingTime=65, numRestarts=1)) [m[33m2023-09-13 20:10:35,941[m [36mc.a.c.f.k.o.a.EmrEksMetricsAutotuner[m [32m[INFO ][flink/autoscaling-example] Calculated average restart.time metric via autotuning to be: PT0.065S -