Kinesis 接続 - AWS Glue

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Kinesis 接続

Kinesis 接続を使用して、Data Catalog テーブルに保存されている情報を使用するか、データストリームに直接アクセスするための情報を提供することで、Amazon Kinesis データストリームを読み書きできます。Kinesis から Spark に情報を読み取ってから DataFrame、 AWS Glue に変換できます DynamicFrame。JSON 形式で Kinesis DynamicFrames に書き込むことができます。データストリームに直接アクセスする場合は、これらのオプションを使用して、データストリームへのアクセス方法に関する情報を提供します。

getCatalogSource または create_data_frame_from_catalog を使用して Kinesis ストリーミングソースからレコードを消費する場合、ジョブはデータカタログデータベースとテーブル名の情報を持っており、それを使用して Kinesis ストリーミングソースから読み込むためのいくつかの基本パラメータを取得することができます。getSourcegetSourceWithFormatcreateDataFrameFromOptions、または create_data_frame_from_options を使用している場合、ここで説明する接続オプションを使用して、これらの基本パラメータを指定する必要があります。

Kinesis の接続オプションは、GlueContext クラス内の指定されたメソッドに対して以下の引数で指定することができます。

  • Scala

    • connectionOptions: getSourcecreateDataFrameFromOptionsgetSink で使用

    • additionalOptions: getCatalogSourcegetCatalogSink で使用

    • options: getSourceWithFormatgetSinkWithFormat で使用

  • Python

    • connection_options: create_data_frame_from_optionswrite_dynamic_frame_from_options で使用

    • additional_options: create_data_frame_from_catalogwrite_dynamic_frame_from_catalog で使用

    • options: getSourcegetSink で使用

ストリーミングETLジョブに関する注意事項と制限については、「」を参照してくださいストリーミング ETL に関する注意と制限

Kinesis の設定

AWS Glue Spark ジョブで Kinesis データストリームに接続するには、いくつかの前提条件が必要です。

  • 読み取りの場合、 AWS Glue ジョブには Kinesis データストリームへの読み取りアクセスレベルのアクセスIAM許可が必要です。

  • 書き込む場合、 AWS Glue ジョブには Kinesis データストリームへの書き込みアクセスレベルのアクセスIAM許可が必要です。

場合によっては、追加の前提条件を設定する必要があります。

  • AWS Glue ジョブが追加のネットワーク接続 (通常は他のデータセットに接続) で構成されており、それらの接続の 1 つが Amazon VPC Network オプション を提供する場合、ジョブは Amazon 経由で通信するように指示されますVPC。 この場合、Amazon 経由で通信するように Kinesis データストリームを設定する必要がありますVPC。これを行うには、Amazon VPCと Kinesis データストリームの間にインターフェイスVPCエンドポイントを作成します。詳細については、「インターフェイスVPCエンドポイントでの Kinesis Data Streams の使用」を参照してください。

  • 別のアカウントで Amazon Kinesis Data Streams を指定する場合は、クロスアカウントアクセスを許可するようにロールとポリシーを設定する必要があります。詳細については、「Example: Read From a Kinesis Stream in a Different Account」を参照してください。

ストリーミングETLジョブの前提条件の詳細については、「」を参照してくださいAWS Glue でのストリーミング ETL ジョブ

例: Kinesis ストリームからの読み込み

例: Kinesis ストリームからの読み込み

forEachBatch と組み合わせて使用します。

Amazon Kinesis ストリーミングソースの例:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

例: Kinesis ストリームへの書き込み

例: Kinesis ストリームからの読み込み

forEachBatch と組み合わせて使用します。

Amazon Kinesis ストリーミングソースの例:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Kinesis 接続オプションのリファレンス

Amazon Kinesis Data Streams への接続を指定します。

Kinesis ストリーミングデータソースには、次の接続オプションを使用します。

  • "streamARN" (必須) 読み取り/書き込みに使用されます。Kinesis データストリームARNの 。

  • "classification" (読み取りに必須) 読み取りで使用。レコード内のデータで使用されるファイル形式。データカタログを通じて提供されていない限り、必須です。

  • "streamName" - (オプション) 読み取りで使用。読み取り対象/読み取り元の Kinesis データストリームの名前。endpointUrl で使用。

  • "endpointUrl" - (オプション) 読み取りで使用。デフォルト:https://kinesis.us-east-1.amazonaws.com「」。Kinesis ストリームの AWS エンドポイント。特別なリージョンに接続する場合を除き、これを変更する必要はありません。

  • "partitionKey" - (オプション) 書き込みに使用。レコードを作成する際に使用される Kinesis パーティションキー。

  • "delimiter" (オプション) 読み取りに使用。classification が の場合に使用される値の区切り文字CSV。デフォルトは「,」です。

  • "startingPosition": (オプション) 読み込みに使用。Kinesis データストリーム内の、データの読み取り開始位置。指定できる値は"latest"、、"trim_horizon""earliest"、、またはパターンUTCの形式のタイムスタンプ文字列です yyyy-mm-ddTHH:MM:SSZ ( は +/- のUTCタイムゾーンオフセットZを表します。 例えば、2023-04-04T08:00:00-04:00」) です。デフォルト値は "latest" です。注: のUTCフォーマットのタイムスタンプ文字列"startingPosition"は、 AWS Glue バージョン 4.0 以降でのみサポートされています。

  • "failOnDataLoss": (オプション) アクティブなシャードがないか、有効期限が切れている場合、ジョブは失敗します。デフォルト値は "false" です。

  • "awsSTSRoleARN": (オプション) 読み取り/書き込みに使用。(ARN) を使用して引き受けるロールの Amazon リソースネーム AWS Security Token Service ()AWS STS。このロールには、Kinesis データストリームのレコードの説明操作または読み取り操作の権限が必要です。このパラメーターは、別のアカウントのデータストリームにアクセスするときに使用する必要があります。"awsSTSSessionName" と組み合わせて使用します。

  • "awsSTSSessionName": (オプション) 読み取り/書き込みに使用。 AWS STSを使って、ロールを担うセッションの識別子。このパラメーターは、別のアカウントのデータストリームにアクセスするときに使用する必要があります。"awsSTSRoleARN" と組み合わせて使用します。

  • "awsSTSEndpoint": (オプション) 引き受けたロールを使用して Kinesis に接続するときに使用する AWS STS エンドポイント。これにより、 でリージョン AWS STS エンドポイントを使用できますがVPC、デフォルトのグローバルエンドポイントでは使用できません。

  • "maxFetchTimeInMs": (オプション) 読み込みに使用。ジョブエグゼキューターが Kinesis データストリームから現在のバッチのレコードを読み取るために費やした最大時間は、ミリ秒 (ms) 単位で指定されます。この時間内に複数のGetRecordsAPI呼び出しが行われる場合があります。デフォルト値は 1000 です。

  • "maxFetchRecordsPerShard": (オプション) 読み込みに使用。1 マイクロバッチ当たりに Kinesis データストリームでシャードごとにフェッチするレコードの最大数。メモ: ストリーミングジョブが既に Kinesis (同じ get-records 呼び出しで) から余分なレコードを読み取っている場合、クライアントはこの制限を超えることができます。maxFetchRecordsPerShard が厳密である必要がある場合、maxRecordPerRead の倍数にする必要があります。デフォルト値は 100000 です。

  • "maxRecordPerRead": (オプション) 読み込みに使用。getRecords オペレーションごとに、Kinesis データストリームからフェッチするレコードの最大数。デフォルト値は 10000 です。

  • "addIdleTimeBetweenReads": (オプション) 読み込みに使用。2 つの連続する getRecords オペレーション間の遅延時間を追加します。デフォルト値は "False" です。このオプションは、Glue バージョン 2.0 以降でのみ設定可能です。

  • "idleTimeBetweenReadsInMs": (オプション) 読み込みに使用。2 つの連続する getRecords オペレーション間での、最短の遅延時間 (ミリ秒単位で指定)。デフォルト値は 1000 です。このオプションは、Glue バージョン 2.0 以降でのみ設定可能です。

  • "describeShardInterval": (オプション) 読み込みに使用。スクリプトが再シャーディングを検討するための 2 回のListShardsAPI呼び出し間の最小時間間隔。詳細については、Amazon Kinesis Data Streams デベロッパーガイドの「リシャーディングのための戦略」を参照してください。デフォルト値は 1s です。

  • "numRetries": (オプション) 読み込みに使用。Kinesis Data Streams APIリクエストの最大再試行回数。デフォルト値は 3 です。

  • "retryIntervalMs": (オプション) 読み込みに使用。Kinesis Data Streams API呼び出しを再試行するまでのクールオフ期間 (ミリ秒単位で指定)。デフォルト値は 1000 です。

  • "maxRetryIntervalMs": (オプション) 読み込みに使用。Kinesis Data Streams API呼び出しの 2 回の再試行間の最大クールオフ時間 (ミリ秒単位で指定)。デフォルト値は 10000 です。

  • "avoidEmptyBatches": (オプション) 読み込みに使用。バッチ処理を開始する前に、Kinesis データストリームで未読のデータをチェックすることで、空のマイクロバッチジョブを作成しないようにします。デフォルト値は "False" です。

  • "schema": (false inferSchema に設定すると必須) 読み取りに使用されます。ペイロードの処理に使用するスキーマ。分類が avro である場合、提供されるスキーマには Avro スキーマ形式を使用する必要があります。分類が でない場合avro、指定されたスキーマはDDLスキーマ形式である必要があります。

    以下に、スキーマの例を示します。

    Example in DDL schema format
    `column1` INT, `column2` STRING , `column3` FLOAT
    Example in Avro schema format
    { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
  • "inferSchema": (オプション) 読み込みに使用。デフォルト値は、「false」です。「true」に設定すると、実行時に、スキーマが foreachbatch 内のペイロードから検出されます。

  • "avroSchema": (非推奨) 読み取りに使用。Avro 形式を使用する場合に、Avro データのスキーマを指定するために使用されるパラメータです。このパラメータは非推奨となりました。schema パラメータを使用します。

  • "addRecordTimestamp": (オプション) 読み込みに使用。このオプションが「true」に設定されている場合、データ出力には、対応するレコードがストリームによって受信された時刻を表示する「__src_timestamp」という名前が付けられた追加の列が含まれます。デフォルト値は、「false」です。このオプションは AWS Glue バージョン 4.0 以降でサポートされています。

  • "emitConsumerLagMetrics": (オプション) 読み込みに使用。オプションが「true」に設定されている場合、バッチごとに、ストリームが受信した最も古いレコードから に到着するまでの期間のメトリクスが出力されますAWS Glue CloudWatch。メトリクスの名前は「glue.driver.streaming」ですmaxConsumerLagInMs。デフォルト値は、「false」です。このオプションは AWS Glue バージョン 4.0 以降でサポートされています。

  • "fanoutConsumerARN": (オプション) 読み込みに使用。で指定されたストリームARNの Kinesis ストリームコンシューマーの streamARN。Kinesis 接続の拡張ファンアウトモードを有効にするために使用されます。拡張ファンアウトが使用された Kinesis ストリームの使用に関する詳細については、「Kinesis ストリーミングジョブでの拡張ファンアウトの使用」を参照してください。

  • "recordMaxBufferedTime" - (オプション) 書き込みに使用。デフォルト: 1000 (ミリ秒)。レコードが書き込まれるのを待っている間にバッファリングされる最大時間。

  • "aggregationEnabled" - (オプション) 書き込みに使用。デフォルト: true。Kinesis に送信する前にレコードを集約するかどうかを指定します。

  • "aggregationMaxSize" - (オプション) 書き込みに使用。デフォルト: 51200 (バイト) レコードがこの制限よりも大きい場合、そのレコードはアグリゲータをバイパスします。注: Kinesis では、レコードサイズに 50 KB の制限が適用されます。これを 50 KB を超えて設定すると、サイズ超過のレコードは Kinesis によって拒否されます。

  • "aggregationMaxCount" - (オプション) 書き込みに使用。デフォルト: 4294967295。集計されたレコードにパックされる項目の最大数。

  • "producerRateLimit" - (オプション) 書き込みに使用。デフォルト: 150 (%)。1 つのプロデューサー (ジョブなど) からの送信されるシャード単位のスループットをバックエンド制限のパーセンテージとして制限できます。

  • "collectionMaxCount" - (オプション) 書き込みに使用。デフォルト: 500。 PutRecords リクエストにパックする項目の最大数。

  • "collectionMaxSize" - (オプション) 書き込みに使用。デフォルト: 5242880 (バイト)。 PutRecords リクエストで送信するデータの最大量。