This section describes auto-tuning behavior for various Amazon EMR versions. It also goes into detail regarding different auto-scaling configurations.
Note
Amazon EMR 7.2.0 and higher uses the open source configuration job.autoscaler.restart.time-tracking.enabled
to enable rescale time estimation. Rescale time estimation has the same
functionality as Amazon EMR autotuning, so you don't have to manually assign empirical values to the restart time.
You can still use Amazon EMR autotuning if you're using Amazon EMR 7.1.0 or lower.
Amazon EMR 7.2.0 and higher measures the actual required restart time to apply autoscaling
decisions. In releases 7.1.0 and lower, you had to use the configuration job.autoscaler.restart.time
to
manually configure estimated maximum restart time. By using the configuration
job.autoscaler.restart.time-tracking.enabled
, you only need to enter
a restart time for the first scaling. Afterwards, the operator records the
actual restart time and will use it for subsequent scalings.
To enable this tracking, use the following command:
job.autoscaler.restart.time-tracking.enabled: true
The following are the related configurations for rescale time estimation.
Configuration | Required | Default | Description |
---|---|---|---|
job.autoscaler.restart.time-tracking.enabled | No | False | Indicates whether the Flink Autoscaler should automatically tune configurations over time to optimize scaling descisions.
Note that the Autoscaler can only autotune the Autoscaler parameter restart.time . |
job.autoscaler.restart.time | No | 5m | The expected restart time that Amazon EMR on EKS uses until the operator can determine the actual restart time from previous scalings. |
job.autoscaler.restart.time-tracking.limit | No | 15m | The maximum observed restart time when job.autoscaler.restart.time-tracking.enabled is set to true . |
The following is an example deployment spec you can use to try out rescale time estimation:
apiVersion: flink.apache.org/v1beta1
kind: FlinkDeployment
metadata:
name: autoscaling-example
spec:
flinkVersion: v1_18
flinkConfiguration:
# Autoscaler parameters
job.autoscaler.enabled: "true"
job.autoscaler.scaling.enabled: "true"
job.autoscaler.stabilization.interval: "5s"
job.autoscaler.metrics.window: "1m"
job.autoscaler.restart.time-tracking.enabled: "true"
job.autoscaler.restart.time: "2m"
job.autoscaler.restart.time-tracking.limit: "10m"
jobmanager.scheduler: adaptive
taskmanager.numberOfTaskSlots: "1"
pipeline.max-parallelism: "12"
executionRoleArn: <JOB ARN>
emrReleaseLabel: emr-7.7.0-flink-latest
jobManager:
highAvailabilityEnabled: false
storageDir: s3://<s3_bucket>
/flink/autoscaling/ha/
replicas: 1
resource:
memory: "1024m"
cpu: 0.5
taskManager:
resource:
memory: "1024m"
cpu: 0.5
job:
jarURI: s3://<s3_bucket>
/some-job-with-back-pressure
parallelism: 1
upgradeMode: stateless
To simulate backpressure, use the following deployment spec.
job:
jarURI: s3://<s3_bucket>
/pyflink-script.py
entryClass: "org.apache.flink.client.python.PythonDriver"
args: ["-py", "/opt/flink/usrlib/pyflink-script.py"]
parallelism: 1
upgradeMode: stateless
Upload the following Python script to your S3 bucket.
import logging
import sys
import time
import random
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import StreamTableEnvironment
TABLE_NAME="orders"
QUERY=f"""
CREATE TABLE {TABLE_NAME} (
id INT,
order_time AS CURRENT_TIMESTAMP,
WATERMARK FOR order_time AS order_time - INTERVAL '5' SECONDS
)
WITH (
'connector' = 'datagen',
'rows-per-second'='10',
'fields.id.kind'='random',
'fields.id.min'='1',
'fields.id.max'='100'
);
"""
def create_backpressure(i):
time.sleep(2)
return i
def autoscaling_demo():
env = StreamExecutionEnvironment.get_execution_environment()
t_env = StreamTableEnvironment.create(env)
t_env.execute_sql(QUERY)
res_table = t_env.from_path(TABLE_NAME)
stream = t_env.to_data_stream(res_table) \
.shuffle().map(lambda x: create_backpressure(x))\
.print()
env.execute("Autoscaling demo")
if __name__ == '__main__':
logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
autoscaling_demo()
To verify that rescale time estimation is working, make sure that DEBUG
level logging
of the Flink operator is enabled. The example below demonstrates how to update the helm chart
file values.yaml
. Then reinstall the updated helm chart and run your Flink job again.
log4j-operator.properties: |+
# Flink Operator Logging Overrides
rootLogger.level = DEBUG
Getthe name of your leader pod.
ip=$(kubectl get configmap -n $NAMESPACE <job-name>
-cluster-config-map -o json | jq -r ".data[\"org.apache.flink.k8s.leader.restserver\"]" | awk -F: '{print $2}' | awk -F '/' '{print $3}')
kubectl get pods -n $NAMESPACE -o json | jq -r ".items[] | select(.status.podIP == \"$ip\") | .metadata.name"
Run the following command to get the actual restart time used in metrics evaluations.
kubectl logs <FLINK-OPERATOR-POD-NAME>
-c flink-kubernetes-operator -n <OPERATOR-NAMESPACE>
-f | grep "Restart time used in scaling summary computation"
You should see logs similar to the following. Note that only the first scaling uses job.autoscaler.restart.time
. Subsequent
scalings use the observed restart time.
2024-05-16 17:17:32,590 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT2M
2024-05-16 17:19:03,787 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:19:18,976 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:20:50,283 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S
2024-05-16 17:22:21,691 o.a.f.a.ScalingExecutor [DEBUG][default/autoscaler-example] Restart time used in scaling summary computation: PT14S