Apache Flink のインプレースバージョンアップグレード - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Apache Flink のインプレースバージョンアップグレード

Apache Flink のインプレースバージョンアップグレードでは、Apache Flink バージョン全体で単一の ARN に対するアプリケーションのトレーサビリティを維持します。これには、スナップショット、ログ、メトリクス、タグ、Flink 設定、リソース制限の引き上げ、VPCsなどが含まれます。Apache Flink のインプレースバージョンアップグレードを実行して、既存のアプリケーションを Amazon Managed Service for Apache Flink の新しい Flink バージョンにアップグレードできます。このタスクを実行するには、、 AWS CLI、 SDK AWS CloudFormation、 AWS または を使用できます AWS Management Console。

注記

Amazon Managed Service for Apache Flink Studio では、Apache Flink のインプレースバージョンアップグレードを使用できません。

Apache Flink のインプレースバージョンアップグレードを使用したアプリケーションのアップグレード

開始する前に、「インプレースバージョンアップグレード」の動画を視聴することをお勧めします。

Apache Flink のインプレースバージョンアップグレードを実行するには、、 AWS CLI AWS CloudFormation、 AWS SDK、または を使用できます AWS Management Console。この機能は、 READYまたは RUNNING状態の Managed Service for Apache Flink で使用する既存のアプリケーションで使用できます。 UpdateApplication API を使用して、Flink ランタイムを変更する機能を追加します。

アップグレード前: Apache Flink アプリケーションの更新

Flink アプリケーションを記述するときは、アプリケーションの JAR に依存関係をバンドルし、その JAR を Amazon S3 バケットにアップロードします。そこから、Amazon Managed Service for Apache Flink は、選択した新しい Flink ランタイムでジョブを実行します。アップグレードする Flink ランタイムとの互換性を実現するために、アプリケーションを更新する必要がある場合があります。Flink バージョン間に不整合があり、バージョンアップグレードが失敗する可能性があります。最も一般的には、これは送信元 (進入) または送信先 (シンク、出力) と Scala の依存関係のコネクタを使用します。Managed Service for Apache Flink の Flink 1.15 以降のバージョンは Scala に依存しないため、JAR には使用する Scala のバージョンが含まれている必要があります。

アプリケーションを更新するには

  1. 状態のアプリケーションのアップグレードに関する Flink コミュニティからのアドバイスをお読みください。「アプリケーションと Flink バージョンのアップグレード」を参照してください。

  2. 既知の問題と制限事項のリストをお読みください。注意事項と既知の問題 を参照してください。

  3. 依存関係を更新し、アプリケーションをローカルでテストします。通常、これらの依存関係は次のとおりです。

    1. Flink ランタイムと API。

    2. 新しい Flink ランタイムに推奨されるコネクタ。これらは、更新する特定のランタイムのリリースバージョンで確認できます。

    3. Scala – Apache Flink は、Flink 1.15 以降、Scala に依存しません。アプリケーション JAR で使用する Scala の依存関係を含める必要があります。

  4. zipfile に新しいアプリケーション JAR を構築し、Amazon S3 にアップロードします。以前の JAR/zip ファイルとは異なる名前を使用することをお勧めします。ロールバックする必要がある場合は、この情報を使用します。

  5. ステートフルアプリケーションを実行している場合は、現在のアプリケーションのスナップショットを作成することを強くお勧めします。これにより、アップグレード中またはアップグレード後に問題が発生した場合に、ステートリーにロールバックできます。

アプリケーションを新しい Apache Flink バージョンにアップグレードする

UpdateApplication アクションを使用して Flink アプリケーションをアップグレードできます。

UpdateApplication API は複数の方法で呼び出すことができます。

  • で既存の設定ワークフローを使用します AWS Management Console。

    • のアプリページに移動します AWS Management Console。

    • [設定] を選択します。

    • 新しいランタイムと、復元設定とも呼ばれる、開始するスナップショットを選択します。最新のスナップショットからアプリケーションを起動するには、最新の設定を復元設定として使用します。Amazon S3 で新しくアップグレードされたアプリケーション JAR/zip をポイントします。

  • AWS CLI update-application アクションを使用します。

  • AWS CloudFormation (CFN) を使用します。

    • RuntimeEnvironment フィールドを更新します。以前は、 がアプリケーション AWS CloudFormation を削除し、新しいアプリケーションを作成していたため、スナップショットやその他のアプリケーション履歴が失われていました。これで RuntimeEnvironment 、 が所定の位置に AWS CloudFormation 更新され、アプリケーションは削除されません。

  • AWS SDK を使用します。

    • 選択したプログラミング言語については、 SDK のドキュメントを参照してください。「UpdateApplication」を参照してください。

アップグレードは、アプリケーションが RUNNING状態の間、またはアプリケーションが READY状態で停止している間に実行できます。Amazon Managed Service for Apache Flink は、元のランタイムバージョンとターゲットランタイムバージョンの互換性を検証します。この互換性チェックは、 RUNNING状態UpdateApplicationの間に を実行すると が実行され、 READY状態の間にアップグレードStartApplicationすると次の で実行されます。

次の例は、 を使用して、 という名前RUNNINGのアプリケーションを米国東部 (バージニア北部) の UpgradeTest Flink 1.18 にアップグレード AWS CLI し、最新のスナップショットからアップグレードされたアプリケーションを起動することを示しています。

aws --region us-east-1 kinesisanalyticsv2 update-application \ --application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \ --application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\ '{"CodeContentUpdate": {"S3ContentLocationUpdate": '\ '{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \ --run-configuration-update '{"ApplicationRestoreConfiguration": '\ '{"ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT"}}' \ --current-application-version-id ${current_application_version}
  • サービススナップショットを有効にし、最新のスナップショットからアプリケーションを継続する場合、Amazon Managed Service for Apache Flink は、現在のRUNNINGアプリケーションのランタイムが選択したターゲットランタイムと互換性があることを確認します。

  • ターゲットランタイムを継続するスナップショットを指定した場合、Amazon Managed Service for Apache Flink は、ターゲットランタイムが指定されたスナップショットと互換性があることを確認します。互換性チェックが失敗した場合、更新リクエストは拒否され、アプリケーションは RUNNING状態のままになります。

  • スナップショットなしでアプリケーションを起動することを選択した場合、Amazon Managed Service for Apache Flink は互換性チェックを実行しません。

  • アップグレードしたアプリケーションが失敗したり、推移的なUPDATING状態で停止したりした場合は、 ロールバックセクションの指示に従って正常な状態に戻ります。

ステートアプリケーションを実行するためのプロセスフロー

次の図は、実行中にアプリケーションをアップグレードするために推奨されるワークフローを示しています。アプリケーションがステートフルであり、スナップショットを有効にしていることを前提としています。このワークフローでは、更新時に、更新前に Amazon Managed Service for Apache Flink によって自動的に作成された最新のスナップショットからアプリケーションを復元します。

次の例は、 を使用して、米国東部 (バージニア北部) UpgradeTest の という名前の READY 状態のアプリケーションを Flink 1.18 にアップグレードする例を示しています。 AWS CLIアプリケーションが実行されていないため、アプリケーションを起動するためのスナップショットが指定されていません。スナップショットは、アプリケーション開始リクエストを発行するときに指定できます。

aws --region us-east-1 kinesisanalyticsv2 update-application \ --application-name UpgradeTest --runtime-environment-update "FLINK-1_18" \ --application-configuration-update '{"ApplicationCodeConfigurationUpdate": '\ '{"CodeContentUpdate": {"S3ContentLocationUpdate": '\ '{"FileKeyUpdate": "flink_1_18_app.jar"}}}}' \ --current-application-version-id ${current_application_version}
  • READY 状態のアプリケーションのランタイムを任意の Flink バージョンに更新できます。Amazon Managed Service for Apache Flink は、アプリケーションを起動するまでチェックを実行しません。

  • Amazon Managed Service for Apache Flink は、アプリケーションを起動するために選択したスナップショットに対してのみ互換性チェックを実行します。これらは、「Flink 互換性表」に従った基本的な互換性チェックです。スナップショットが作成された Flink バージョンとターゲットとする Flink バージョンのみをチェックします。選択したスナップショットの Flink ランタイムがアプリの新しいランタイムと互換性がない場合、開始リクエストが拒否される可能性があります。

準備完了状態のアプリケーションのプロセスフロー

次の図は、準備完了状態でアプリケーションをアップグレードするために推奨されるワークフローを示しています。アプリケーションがステートフルであり、スナップショットを有効にしていることを前提としています。このワークフローでは、更新時に、アプリケーションの停止時に Amazon Managed Service for Apache Flink によって自動的に作成された最新のスナップショットからアプリケーションを復元します。

ロールバック

アプリケーションに問題がある場合、または Flink バージョン間でアプリケーションコードに不整合がある場合は、 AWS CLI、、 AWS SDK AWS CloudFormation、または を使用してロールバックできます AWS Management Console。次の例は、さまざまな障害シナリオでのロールバックがどのように見えるかを示しています。

ランタイムのアップグレードは成功し、アプリケーションは RUNNING状態ですが、ジョブは失敗し、継続的に再起動しています

米国東部 (バージニア北部) で、 という名前のステートフルアプリケーションを Flink 1.15 TestApplicationから Flink 1.18 にアップグレードしようとしているとします。ただし、アップグレードされた Flink 1.18 アプリケーションは、アプリケーションが RUNNING状態であっても、起動に失敗しているか、常に再起動しています。これは一般的な障害シナリオです。さらなるダウンタイムを避けるため、アプリケーションをすぐに以前の実行中のバージョン (Flink 1.15) にロールバックし、後で問題を診断することをお勧めします。

アプリケーションを以前の実行バージョンにロールバックするには、rollback-application AWS CLI コマンドまたは RollbackApplication API アクションを使用します。この API アクションは、最新バージョンになった変更をロールバックします。次に、最後に成功したスナップショットを使用してアプリケーションを再起動します。

アップグレードを試みる前に、既存のアプリでスナップショットを作成することを強くお勧めします。これにより、データ損失やデータの再処理を回避できます。

この障害シナリオでは、 はアプリケーションをロールバック AWS CloudFormation しません。テンプレートを更新 CloudFormation して、以前のランタイムを指し、 がアプリケーションを更新 CloudFormation するよう強制するには、前のコードを指す必要があります。それ以外の場合、 CloudFormation はアプリケーションが RUNNING状態に移行したときに更新されたことを前提としています。

スタックしているアプリケーションをロールバックする UPDATING

アップグレードの試行後にアプリケーションが UPDATINGまたは AUTOSCALING状態でスタックした場合、Amazon Managed Service for Apache Flink は rollback-applications AWS CLI コマンド、またはアプリケーションをスタックUPDATINGまたは AUTOSCALING状態より前のバージョンにロールバックできる RollbackApplications API アクションを提供します。この API は、アプリケーションが UPDATINGまたはAUTOSCALING推移的な状態でスタックする原因となった変更をロールバックします。

一般的なベストプラクティスと推奨事項

  • 本番稼働用アップグレードを試みる前に、本番稼働用以外の環境で 状態なしで新しいジョブ/ランタイムをテストします。

  • まず、非本番稼働用アプリケーションでステートフルアップグレードをテストすることを検討してください。

  • 新しいジョブグラフに、アップグレードされたアプリケーションの起動に使用するスナップショットと互換性がある状態があることを確認してください。

    • 演算子の状態に保存されている型が同じままであることを確認してください。タイプが変更された場合、Apache Flink は演算子の状態を復元できません。

    • uid メソッドを使用して設定したオペレーター IDsが同じままであることを確認してください。Apache Flink は、一意の IDsことを強くお勧めします。詳細については、Apache Flink ドキュメントの「オペレーター IDs」を参照してください。

      オペレーターに IDs を割り当てない場合、Flink は自動的に ID を生成します。その場合、プログラム構造に依存し、変更すると互換性の問題が発生する可能性があります。Flink は、オペレータ IDsを使用してスナップショットの状態をオペレータに一致させます。オペレータ IDs を変更すると、アプリケーションが起動しないか、スナップショットに保存されている状態が削除され、新しいオペレータが状態なしで開始されます。

    • キー付き状態の保存に使用されるキーを変更しないでください。

    • ウィンドウや結合などのステートフル演算子の入力タイプを変更しないでください。これにより、オペレーターの内部状態のタイプが暗黙的に変更され、状態が非互換性になります。

注意事項と既知の問題

Flink 1.19 以降からの設定変更は許可されていません

  • Flink 1.18 以前から Flink 1.19 以降でランタイムを更新する場合、Flink ジョブコードを使用した Flink ジョブ設定の変更は許可されなくなります。その結果、アプリケーションはジョブの送信に失敗します。エラーログは、実行時にどの許可されていない設定が変更されたかを示します。詳細については、「FlinkRuntimeException:「許可されていない設定変更 (複数可) が検出されました」」を参照してください。

状態の互換性に関する既知の制限事項

  • Table API を使用している場合、Apache Flink は Flink バージョン間の状態の互換性を保証しません。詳細については、Apache Flink ドキュメントの「ステートフルアップグレードと進化」を参照してください。

  • Flink 1.6 の状態は Flink 1.18 と互換性がありません。状態が 1.6 から 1.18 以降にアップグレードしようとすると、API はリクエストを拒否します。1.8、1.11、1.13、1.15 にアップグレードしてスナップショットを作成し、1.18 以降にアップグレードできます。詳細については、Apache Flink ドキュメントの「アプリケーションと Flink バージョンのアップグレード」を参照してください。

Flink Kinesis Connector の既知の問題

  • Flink 1.11 以前を使用していて、E nhanced-fan-out (EFO) サポート用のamazon-kinesis-connector-flinkコネクタを使用している場合は、Flink 1.13 以降にステートフルアップグレードするための追加の手順を実行する必要があります。これは、コネクタのパッケージ名が変更されたためです。詳細については、「」を参照してくださいamazon-kinesis-connector-flink

    Flink 1.11 以前のamazon-kinesis-connector-flinkコネクタはパッケージ を使用しsoftware.amazon.kinesis、Flink 1.13 以降の Kinesis コネクタは を使用しますorg.apache.flink.streaming.connectors.kinesis。移行をサポートするには、このツールを使用します: amazon-kinesis-connector-flink-state-migrator

  • で Flink 1.13 以前を使用してFlinkKinesisProducerいて、Flink 1.15 以降にアップグレードする場合、ステートフルアップグレードでは、新しい ではなく Flink 1.15 以降FlinkKinesisProducerで引き続き を使用する必要がありますKinesisStreamsSink。ただし、シンクにカスタムuid設定が既にある場合は、 が 状態を保持しないKinesisStreamsSinkため、 FlinkKinesisProducer に切り替えることができます。Flink は、カスタムuidが設定されているため、同じ演算子として扱います。

Scala で記述された Flink アプリケーション

  • Flink 1.15 以降、Apache Flink はランタイムに Scala を含めません。Flink 1.15 以降にアップグレードするときは、使用する Scala のバージョンと、その他の Scala の依存関係をコード JAR/zip に含める必要があります。詳細については、「Amazon Managed Service for Apache Flink for Apache Flink 1.15.2 release 」を参照してください。

  • アプリケーションで Scala を使用していて、Flink 1.11 以前 (Scala 2.11) から Flink 1.13 (Scala 2.12) にアップグレードする場合は、コードで Scala 2.12 を使用していることを確認してください。そうしないと、Flink 1.13 アプリケーションが Flink 1.13 ランタイムで Scala 2.11 クラスを見つけられない可能性があります。

Flink アプリケーションをダウングレードする際の考慮事項

  • Flink アプリケーションのダウングレードは可能ですが、以前の Flink バージョンでアプリケーションが実行されていた場合に限られます。ステートフルアップグレードの場合、Managed Service for Apache Flink では、ダウングレードに一致する 以前のバージョンで作成されたスナップショットを使用する必要があります。

  • ランタイムを Flink 1.13 以降から Flink 1.11 以前に更新していて、アプリケーションが HashMap 状態バックエンドを使用している場合、アプリケーションは継続的に失敗します。