Amazon Kinesis Data Streams
開発者ガイド

Python での Kinesis クライアントライブラリコンシューマーの開発

Kinesis データストリームのデータを処理するアプリケーションを構築するには、Kinesis Client Library (KCL) を使用できます。Kinesis Client Library は、複数の言語で使用できます。このトピックでは、Python について説明します。

KCL は Java ライブラリです。Java 以外の言語のサポートは、MultiLangDaemon という多言語インターフェイスを使用して提供されます。このデーモンは Java ベースで、Java 以外の KCL 言語を使用するときに実行されます。 そのため、KCL for Python をインストールして、コンシューマーアプリケーションをすべて Python で書く場合でも、MultiLangDaemon を使用するために、Java をシステムにインストールする必要があります。さらに、MultiLangDaemon には、接続先の AWS リージョンなど、ユースケースに合わせてカスタマイズする必要のあるデフォルト設定例があります。GitHub の MultiLangDaemon の詳細については、「KCL MultiLangDaemon project」のページを参照してください。

GitHub から Python KCL をダウンロードするには、Kinesis Client Library (Python) にアクセスしてください。Python KCL コンシューマーアプリケーションのサンプルコードをダウンロードするには、GitHub で「KCL for Python sample project」ページにアクセスしてください。

Python で KCLコンシューマーアプリケーションを実装する場合は、次のタスクを完了する必要があります。

RecordProcessor クラスのメソッドを実装する

RecordProcess クラスでは、RecordProcessorBase クラスを拡張して次のメソッドを実装する必要があります。

initialize process_records shutdown_requested

このサンプルでは、開始点として使用できる実装を提供しています。

#!/usr/bin/env python # Copyright 2014-2015 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Amazon Software License (the "License"). # You may not use this file except in compliance with the License. # A copy of the License is located at # # http://aws.amazon.com/asl/ # # or in the "license" file accompanying this file. This file is distributed # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either # express or implied. See the License for the specific language governing # permissions and limitations under the License. from __future__ import print_function import sys import time from amazon_kclpy import kcl from amazon_kclpy.v3 import processor class RecordProcessor(processor.RecordProcessorBase): """ A RecordProcessor processes data from a shard in a stream. Its methods will be called with this pattern: * initialize will be called once * process_records will be called zero or more times * shutdown will be called if this MultiLangDaemon instance loses the lease to this shard, or the shard ends due a scaling change. """ def __init__(self): self._SLEEP_SECONDS = 5 self._CHECKPOINT_RETRIES = 5 self._CHECKPOINT_FREQ_SECONDS = 60 self._largest_seq = (None, None) self._largest_sub_seq = None self._last_checkpoint_time = None def log(self, message): sys.stderr.write(message) def initialize(self, initialize_input): """ Called once by a KCLProcess before any calls to process_records :param amazon_kclpy.messages.InitializeInput initialize_input: Information about the lease that this record processor has been assigned. """ self._largest_seq = (None, None) self._last_checkpoint_time = time.time() def checkpoint(self, checkpointer, sequence_number=None, sub_sequence_number=None): """ Checkpoints with retries on retryable exceptions. :param amazon_kclpy.kcl.Checkpointer checkpointer: the checkpointer provided to either process_records or shutdown :param str or None sequence_number: the sequence number to checkpoint at. :param int or None sub_sequence_number: the sub sequence number to checkpoint at. """ for n in range(0, self._CHECKPOINT_RETRIES): try: checkpointer.checkpoint(sequence_number, sub_sequence_number) return except kcl.CheckpointError as e: if 'ShutdownException' == e.value: # # A ShutdownException indicates that this record processor should be shutdown. This is due to # some failover event, e.g. another MultiLangDaemon has taken the lease for this shard. # print('Encountered shutdown exception, skipping checkpoint') return elif 'ThrottlingException' == e.value: # # A ThrottlingException indicates that one of our dependencies is is over burdened, e.g. too many # dynamo writes. We will sleep temporarily to let it recover. # if self._CHECKPOINT_RETRIES - 1 == n: sys.stderr.write('Failed to checkpoint after {n} attempts, giving up.\n'.format(n=n)) return else: print('Was throttled while checkpointing, will attempt again in {s} seconds' .format(s=self._SLEEP_SECONDS)) elif 'InvalidStateException' == e.value: sys.stderr.write('MultiLangDaemon reported an invalid state while checkpointing.\n') else: # Some other error sys.stderr.write('Encountered an error while checkpointing, error was {e}.\n'.format(e=e)) time.sleep(self._SLEEP_SECONDS) def process_record(self, data, partition_key, sequence_number, sub_sequence_number): """ Called for each record that is passed to process_records. :param str data: The blob of data that was contained in the record. :param str partition_key: The key associated with this recod. :param int sequence_number: The sequence number associated with this record. :param int sub_sequence_number: the sub sequence number associated with this record. """ #################################### # Insert your processing logic here #################################### self.log("Record (Partition Key: {pk}, Sequence Number: {seq}, Subsequence Number: {sseq}, Data Size: {ds}" .format(pk=partition_key, seq=sequence_number, sseq=sub_sequence_number, ds=len(data))) def should_update_sequence(self, sequence_number, sub_sequence_number): """ Determines whether a new larger sequence number is available :param int sequence_number: the sequence number from the current record :param int sub_sequence_number: the sub sequence number from the current record :return boolean: true if the largest sequence should be updated, false otherwise """ return self._largest_seq == (None, None) or sequence_number > self._largest_seq[0] or \ (sequence_number == self._largest_seq[0] and sub_sequence_number > self._largest_seq[1]) def process_records(self, process_records_input): """ Called by a KCLProcess with a list of records to be processed and a checkpointer which accepts sequence numbers from the records to indicate where in the stream to checkpoint. :param amazon_kclpy.messages.ProcessRecordsInput process_records_input: the records, and metadata about the records. """ try: for record in process_records_input.records: data = record.binary_data seq = int(record.sequence_number) sub_seq = record.sub_sequence_number key = record.partition_key self.process_record(data, key, seq, sub_seq) if self.should_update_sequence(seq, sub_seq): self._largest_seq = (seq, sub_seq) # # Checkpoints every self._CHECKPOINT_FREQ_SECONDS seconds # if time.time() - self._last_checkpoint_time > self._CHECKPOINT_FREQ_SECONDS: self.checkpoint(process_records_input.checkpointer, str(self._largest_seq[0]), self._largest_seq[1]) self._last_checkpoint_time = time.time() except Exception as e: self.log("Encountered an exception while processing records. Exception was {e}\n".format(e=e)) def lease_lost(self, lease_lost_input): self.log("Lease has been lost") def shard_ended(self, shard_ended_input): self.log("Shard has ended checkpointing") shard_ended_input.checkpointer.checkpoint() def shutdown_requested(self, shutdown_requested_input): self.log("Shutdown has been requested, checkpointing.") shutdown_requested_input.checkpointer.checkpoint() if __name__ == "__main__": kcl_process = kcl.KCLProcess(RecordProcessor()) kcl_process.run()

設定プロパティを変更する

このサンプルでは、次のスクリプトに示すように、設定プロパティのデフォルト値を提供します。これらのプロパティを独自の値にオーバーライドできます。

# The script that abides by the multi-language protocol. This script will # be executed by the MultiLangDaemon, which will communicate with this script # over STDIN and STDOUT according to the multi-language protocol. executableName = sample_kclpy_app.py # The name of an Amazon Kinesis stream to process. streamName = words # Used by the KCL as the name of this application. Will be used as the name # of an Amazon DynamoDB table which will store the lease and checkpoint # information for workers with this application name applicationName = PythonKCLSample # Users can change the credentials provider the KCL will use to retrieve credentials. # The DefaultAWSCredentialsProviderChain checks several other providers, which is # described here: # http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html AWSCredentialsProvider = DefaultAWSCredentialsProviderChain # Appended to the user agent of the KCL. Does not impact the functionality of the # KCL in any other way. processingLanguage = python/2.7 # Valid options at TRIM_HORIZON or LATEST. # See http://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html#API_GetShardIterator_RequestSyntax initialPositionInStream = TRIM_HORIZON # The following properties are also available for configuring the KCL Worker that is created # by the MultiLangDaemon. # The KCL defaults to us-east-1 #regionName = us-east-1 # Fail over time in milliseconds. A worker which does not renew it's lease within this time interval # will be regarded as having problems and it's shards will be assigned to other workers. # For applications that have a large number of shards, this msy be set to a higher number to reduce # the number of DynamoDB IOPS required for tracking leases #failoverTimeMillis = 10000 # A worker id that uniquely identifies this worker among all workers using the same applicationName # If this isn't provided a MultiLangDaemon instance will assign a unique workerId to itself. #workerId = # Shard sync interval in milliseconds - e.g. wait for this long between shard sync tasks. #shardSyncIntervalMillis = 60000 # Max records to fetch from Kinesis in a single GetRecords call. #maxRecords = 10000 # Idle time between record reads in milliseconds. #idleTimeBetweenReadsInMillis = 1000 # Enables applications flush/checkpoint (if they have some data "in progress", but don't get new data for while) #callProcessRecordsEvenForEmptyRecordList = false # Interval in milliseconds between polling to check for parent shard completion. # Polling frequently will take up more DynamoDB IOPS (when there are leases for shards waiting on # completion of parent shards). #parentShardPollIntervalMillis = 10000 # Cleanup leases upon shards completion (don't wait until they expire in Kinesis). # Keeping leases takes some tracking/resources (e.g. they need to be renewed, assigned), so by default we try # to delete the ones we don't need any longer. #cleanupLeasesUponShardCompletion = true # Backoff time in milliseconds for Amazon Kinesis Client Library tasks (in the event of failures). #taskBackoffTimeMillis = 500 # Buffer metrics for at most this long before publishing to CloudWatch. #metricsBufferTimeMillis = 10000 # Buffer at most this many metrics before publishing to CloudWatch. #metricsMaxQueueSize = 10000 # KCL will validate client provided sequence numbers with a call to Amazon Kinesis before checkpointing for calls # to RecordProcessorCheckpointer#checkpoint(String) by default. #validateSequenceNumberBeforeCheckpointing = true # The maximum number of active threads for the MultiLangDaemon to permit. # If a value is provided then a FixedThreadPool is used with the maximum # active threads set to the provided value. If a non-positive integer or no # value is provided a CachedThreadPool is used. #maxActiveThreads = 0

アプリケーション名

KCL には、ユーザーのアプリケーション間、および同じリージョン内の Amazon DynamoDB テーブル間で一意のアプリケーション名が必要です。次のようにアプリケーション名の設定値を使用します。

  • このアプリケーション名と関連付けられたワーカーはすべて、同じストリーム上で連携して処理しているとみなされます。これらのワーカーは複数のインスタンス間に分散している場合があります。同じアプリケーションコードの追加のインスタンスを実行するときに、アプリケーション名が異なる場合、KCL は 2 番目のインスタンスを、同じストリームで動作するまったく別のアプリケーションと見なします。

  • KCL はアプリケーション名を使用して DynamoDB テーブルを作成し、このテーブルを使用してアプリケーションの状態情報 (チェックポイントやワーカーとシャードのマッピングなど) を保存します。各アプリケーションには、それぞれ DynamoDB テーブルがあります。詳細については、「Amazon Kinesis Data Streams Application の状態の追跡」を参照してください。

認証情報

デフォルトの認証情報プロバイダチェーンのいずれかの認証情報プロバイダで AWS の認証情報を使用できるようにする必要があります。AWSCredentialsProvider プロパティを使用して認証情報プロバイダーを設定できます。Amazon EC2 インスタンスでコンシューマーアプリケーションを実行する場合は、インスタンスに IAM を設定することをお勧めします。この IAM ロールに関連付けられたアクセス許可を反映する AWS の認証情報は、インスタンスメタデータを通じて、インスタンス上のアプリケーションで使用できるようになります。これは、EC2 インスタンスで実行されるコンシューマーアプリケーションの認証情報を管理するための最も安全な方法です。