Using Autoscaler for Flink applications - Amazon EMR

Using Autoscaler for Flink applications

The operator autoscaler can help ease backpressure by collecting metrics from Flink jobs and automatically adjusting parallelism on a job vertex level. The following is an example of what your configuration might look like:

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: ... spec: ... flinkVersion: v1_17 flinkConfiguration: kubernetes.operator.job.autoscaler.enabled: "true" kubernetes.operator.job.autoscaler.stabilization.interval: 1m kubernetes.operator.job.autoscaler.metrics.window: 5m kubernetes.operator.job.autoscaler.target.utilization: "0.6" kubernetes.operator.job.autoscaler.target.utilization.boundary: "0.2" kubernetes.operator.job.autoscaler.restart.time: 2m kubernetes.operator.job.autoscaler.catch-up.duration: 5m pipeline.max-parallelism: "720" ...

The following are configuration options for the autoscaler.

  • kubernetes.operator.job.autoscaler.scaling.enabled – specifies whether to enable autoscaler action. Defaults to false to support a passive/metrics-only mode where the autoscaler only collects and evaluates scaling related performance metrics but does not trigger any job upgrades. This can be used to gain confidence in the module without any impact on the running applications.

  • kubernetes.operator.job.autoscaler.stabilization.interval – the stabilization period in which no new scaling will be executed. Default is 5 minutes.

  • kubernetes.operator.job.autoscaler.metrics.window – the scaling metrics aggregation window size. The larger the window, the more smooth and stability, but the autoscaler might be slower to react to sudden load changes. Default is 10 minutes. We recommend you experiment by using a value between 3 to 60 minutes.

  • kubernetes.operator.job.autoscaler.target.utilization – the target vertex utilization to provide stable job performance and some buffer for load fluctuations. The default is 0.7 targeting 70% utilization/load for the job vertexes.

  • kubernetes.operator.job.autoscaler.target.utilization.boundary – the target vertex utilization boundary that serves as extra buffer to avoid immediate scaling on load fluctuations. Default is 0.4, which means 40% deviation from the target utilization is allowed before triggering a scaling action.

  • kubernetes.operator.job.autoscaler.restart.time – the expected time to restart the application. Default is 3 minutes.

  • kubernetes.operator.job.autoscaler.catch-up.duration – the expected time to catch up, meaning fully processing any backlog after a scaling operation completes. Default is 5 minutes. By lowering the catch-up duration, the autoscaler haves to reserve more extra capacity for the scaling actions.

  • pipeline.max-parallelism – the maximum parallelism the autoscaler can use. The autoscaler ignores this limit if it is higher than the max parallelism configured in the Flink config or directly on each operator. Default is 200. Note that the autoscaler computes the parallelism as a divisor of the max parallelism number therefore it is recommended to choose max parallelism settings that have a lot of divisors instead of relying on the Flink provided defaults. We recommend using multiples of 60 for this configuration, such as 120, 180, 240, 360, 720 etc.

For a more detailed configuration reference page, see Autoscaler configuration.