Développement de producteurs utilisant l'API Amazon Kinesis Data Streams avec AWS SDK for Java - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Développement de producteurs utilisant l'API Amazon Kinesis Data Streams avec AWS SDK for Java

Vous pouvez développer des producteurs à l'aide de l'API Amazon Kinesis Data Streams avec le SDK AWS pour Java. Si vous ne connaissez pas Kinesis Data Streams, commencez par vous familiariser avec les concepts et la terminologie présentés dans Qu'est-ce qu'Amazon Kinesis Data Streams ? et Premiers pas avec Amazon Kinesis Data Streams.

Ces exemples traitent de l'API Kinesis Data Streams et utilisent le SDK AWS pour Java afin d'ajouter (mettre) des données dans un flux. Cependant, pour la plupart des cas d'utilisation, vous préférerez sans doute la bibliothèque Kinesis Data Streams. Pour de plus amples informations, veuillez consulter Développement de producteurs à l'aide de la bibliothèque producteur Amazon Kinesis (KPL).

L'exemple de code Java présenté dans ce chapitre montre comment effectuer les opérations de base d'API Kinesis Data Streams et est divisé logiquement par type d'opération. Ces exemples ne représentent pas du code prêt à la production, car ils ne recherchent pas toutes les exceptions possibles ou ne tiennent pas compte de toutes les considérations possibles en matière de sécurité ou de performances. En outre, vous pouvez appeler l'API Kinesis Data Streams à l'aide d'autres langages de programmation. Pour plus d'informations sur tous les kits de développement logiciel AWS disponibles, consultez la page Commencer à développer avec Amazon Web Services.

Chaque tâche a des conditions préalables ; par exemple, vous ne pouvez pas ajouter de données à un flux tant que vous n'avez pas créé de flux, ce qui demande de créer un client. Pour de plus amples informations, veuillez consulter Création et gestion de flux.

Ajout de données à un flux

Une fois qu'un flux est créé, vous pouvez y ajouter des données sous forme d'enregistrements. Un enregistrement est une structure de données qui contient les données à traiter sous la forme d'un blob de données. Une fois que vous avez stocké les données dans l'enregistrement, Kinesis Data Streams n'inspecte pas, n'interprète pas ou ne modifie absolument pas les données. Chaque enregistrement a également un numéro de séquence et une clé de partition qui lui sont associés.

L'API Kinesis Data Streams comporte deux opérations différentes qui ajoutent des données à un flux, PutRecords et PutRecord. L'opération PutRecords envoie plusieurs enregistrements à votre flux par demande HTTP et l'opération PutRecord envoie des enregistrements à votre flux un à la fois (une demande HTTP distincte est nécessaire pour chaque enregistrement). Vous préférerez sans doute utiliser PutRecords pour la plupart des applications, car cette opération permet d'atteindre un débit supérieur par application producteur. Pour plus d'informations sur chacune de ces opérations, consultez les sous-sections distinctes ci-dessous.

Étant donné que votre application source ajoute des données au flux à l'aide de l'API Kinesis Data Streams, n'oubliez jamais qu'une ou plusieurs applications consommateur traitent très probablement simultanément des données provenant du flux. Pour plus d'informations sur la façon dont les applications consommateur obtiennent les données à l'aide de l'API Kinesis Data Streams, consultez la page Extraction des données d'un flux.

Ajout de plusieurs enregistrements avec PutRecords

L'opération PutRecords envoie plusieurs enregistrements à Kinesis Data Streams dans une seule demande. En utilisant PutRecords, les applications producteur peuvent atteindre un débit supérieur lors de l'envoi de données à leur flux de données Kinesis. Chaque demande PutRecords peut prendre en charge jusqu'à 500 enregistrements. Chaque enregistrement de la demande peut atteindre 1 Mo, jusqu'à une limite de 5 Mo pour l'ensemble de la demande, y compris les clés de partition. Comme avec la seule opération PutRecord décrite ci-dessous, PutRecords utilise des numéros de séquence et des clés de partition. Toutefois, le paramètre PutRecord SequenceNumberForOrdering n'est pas inclus dans un appel PutRecords. L'opération PutRecords tente de traiter tous les enregistrements dans l'ordre naturel de la demande.

Chaque enregistrement de données a un numéro de séquence unique. Le numéro de séquence est attribué par Kinesis Data Streams une fois que vous avez appelé client.putRecords pour ajouter les enregistrements de données au flux. Les numéros de séquence correspondant à une même clé de partition deviennent généralement de plus en plus longs au fil du temps ; plus l'intervalle de temps entre chaque demande PutRecords est élevé, plus les numéros de séquence sont longs.

Note

Les numéros de séquence ne peuvent pas servir d'index aux ensembles de données d'un même flux. Pour séparer logiquement les ensembles de données, utilisez des clés de partition ou créez un flux distinct pour chaque ensemble de données.

Une demande PutRecords peut inclure des enregistrements ayant différentes clés de partition. La portée de la demande est un flux ; chaque demande peut inclure une combinaison de clés de partition et d'enregistrements allant jusqu'aux limites définies pour la demande. Les demandes effectuées avec de nombreuses clés de partition différentes pour des flux comportant de nombreuses partitions différentes sont généralement plus rapides que les demandes comportant un petit nombre de clés de partition pour un petit nombre de partitions. Le nombre de clés de partition doit être beaucoup plus grand que le nombre de partitions pour réduire la latence et optimiser le débit.

Exemple d'opération PutRecords

Le code suivant crée 100 enregistrements de données avec des clés de partition séquentielles et les place dans un flux appelé DataStream.

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis kinesisClient = clientBuilder.build(); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult);

La réponse PutRecords comprend un tableau de réponse Records. Chaque enregistrement compris dans ce tableau de réponse correspond directement à un enregistrement dans le tableau de demande. Ces entrées sont classées dans l'ordre naturel, soit de haut en bas de la demande et de la réponse. Le tableau de réponse Records comprend toujours le même nombre d'enregistrements que le tableau de demande.

Gestion des défaillances lors de l'utilisation de PutRecords

Par défaut, la défaillance d'enregistrements individuels dans une demande n'arrête pas le traitement des enregistrements suivants dans une demande PutRecords. Cela signifie qu'un tableau Records de réponse comprend à la fois des enregistrements traités avec succès et sans succès. Vous devez détecter les enregistrements traités sans succès et les inclure dans un appel ultérieur.

Les enregistrements qui ont réussi incluent les valeurs SequenceNumber et ShardID. Ceux qui ont échoué incluent les valeurs ErrorCode et ErrorMessage. Le paramètre ErrorCode reflète le type d'erreur et peut avoir une des valeurs suivantes : ProvisionedThroughputExceededException ou InternalFailure. ErrorMessage fournit des informations plus détaillées sur l'exception ProvisionedThroughputExceededException, y compris l'ID de compte, le nom du flux et les ID de partition de l'enregistrement qui a été limité. L'exemple ci-dessous contient trois enregistrements dans une demande PutRecords. Le second enregistrement échoue et est reflété dans la réponse.

Exemple Syntaxe de la demande PutRecords
{ "Records": [ { "Data": "XzxkYXRhPl8w", "PartitionKey": "partitionKey1" }, { "Data": "AbceddeRFfg12asd", "PartitionKey": "partitionKey1" }, { "Data": "KFpcd98*7nd1", "PartitionKey": "partitionKey3" } ], "StreamName": "myStream" }
Exemple Syntaxe de la réponse PutRecords
{ "FailedRecordCount”: 1, "Records": [ { "SequenceNumber": "21269319989900637946712965403778482371", "ShardId": "shardId-000000000001" }, { “ErrorCode":”ProvisionedThroughputExceededException”, “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111." }, { "SequenceNumber": "21269319989999637946712965403778482985", "ShardId": "shardId-000000000002" } ] }

Les enregistrements qui ont été traités sans succès peuvent être inclus dans des demandes PutRecords ultérieures. Tout d'abord, vérifiez le paramètre FailedRecordCount de putRecordsResult afin de savoir si la demande comporte des enregistrements d'échecs. Dans ce cas, chaque putRecordsEntry comportant un ErrorCode qui n'est pas null doit être ajouté à une demande ultérieure. Pour un exemple de ce type de gestionnaire, reportez-vous au code suivant.

Exemple Gestionnaire de défaillance PutRecords
PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(myStreamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int j = 0; j < 100; j++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); while (putRecordsResult.getFailedRecordCount() > 0) { final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords(); for (int i = 0; i < putRecordsResultEntryList.size(); i++) { final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i); final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i); if (putRecordsResultEntry.getErrorCode() != null) { failedRecordsList.add(putRecordRequestEntry); } } putRecordsRequestEntryList = failedRecordsList; putRecordsRequest.setRecords(putRecordsRequestEntryList); putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); }

Ajout d'un seul enregistrement avec PutRecord

Chaque appel de PutRecord est exécuté sur un seul enregistrement. Préférez l'opération PutRecords décrite dans Ajout de plusieurs enregistrements avec PutRecords, sauf si votre application a particulièrement besoin de toujours envoyer des enregistrements uniques par demande ou qu'une autre raison empêche l'utilisation de PutRecords.

Chaque enregistrement de données a un numéro de séquence unique. Le numéro de séquence est attribué par Kinesis Data Streams une fois que vous avez appelé client.putRecord pour ajouter l'enregistrement de données au flux. Les numéros de séquence correspondant à une même clé de partition deviennent généralement de plus en plus longs au fil du temps ; plus l'intervalle de temps entre chaque demande PutRecord est élevé, plus les numéros de séquence sont longs.

Lorsque des opérations se succèdent rapidement, il n'est pas sûr que les numéros de séquence renvoyés augmentent, car ces opérations semblent surtout simultanées pour Kinesis Data Streams. Pour garantir une stricte augmentation des numéros de séquence pour la même clé de partition, utilisez le paramètre SequenceNumberForOrdering, comme il est illustré dans l'exemple de code Exemple d'opération PutRecord.

Que vous utilisiez SequenceNumberForOrdering ou non, les enregistrements que Kinesis Data Streams reçoit via un appel GetRecords sont strictement classés par numéro de séquence.

Note

Les numéros de séquence ne peuvent pas servir d'index aux ensembles de données d'un même flux. Pour séparer logiquement les ensembles de données, utilisez des clés de partition ou créez un flux distinct pour chaque ensemble de données.

La clé de partition sert à regrouper les données dans le flux. Un enregistrement de données est attribué à une partition du flux suivant sa clé de partition. Plus précisément, Kinesis Data Streams utilise la clé de partition comme entrée d'une fonction de hachage qui mappe la clé de partition (et les données associées) à une partition spécifique.

Ce mécanisme de hachage a pour effet que tous les enregistrements de données ayant la même clé de partition sont mappés à la même partition du flux. Toutefois, si le nombre de clés de partition dépasse le nombre de partitions, certaines partitions contiennent nécessairement des enregistrements ayant des clés de partition différentes. Du point de vue de la conception, afin de garantir que toutes vos partitions sont bien utilisées, le nombre de partitions (spécifié par la méthode setShardCount de CreateStreamRequest) doit être nettement inférieur à celui des partitions uniques, et la quantité de données qui passe dans une clé de partition unique doit être nettement inférieure à la capacité de la partition.

Exemple d'opération PutRecord

Le code suivant crée dix enregistrements de données répartis entre deux clés de partition et les place dans un flux appelé myStreamName.

for (int j = 0; j < 10; j++) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( myStreamName ); putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() )); putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); PutRecordResult putRecordResult = client.putRecord( putRecordRequest ); sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber(); }

L'exemple de code précédent utilise le paramètre setSequenceNumberForOrdering pour garantir un ordre croissant dans chaque clé de partition. Pour utiliser ce paramètre efficacement, définissez le SequenceNumberForOrdering de l'enregistrement en cours (enregistrement n) sur le numéro de séquence de l'enregistrement précédent (enregistrement n-1). Pour obtenir le numéro de séquence d'un enregistrement qui a été ajouté au flux, appelez getSequenceNumber sur le résultat de putRecord.

Le paramètre SequenceNumberForOrdering garantit des numéros de séquence strictement croissants pour la même clé de partition. SequenceNumberForOrdering ne propose pas l'ordre des enregistrements sur plusieurs clés de partition.

Interaction avec les données à l'aide du registre AWS Glue Schema

Vous pouvez intégrer vos flux de données Kinesis au registre de schémas AWS Glue. Le registre Glue Schema AWS vous permet de découvrir, de contrôler et de faire évoluer de manière centralisée les schémas, tout en garantissant que les données produites sont validées en permanence par un schéma enregistré. Un schéma définit la structure et le format d'un enregistrement de données. Un schéma est une spécification versionnée pour la publication, la consommation ou le stockage des données fiables. Le registre de schémas AWS vous permet d'améliorer la qualité des données de bout en bout et la gouvernance des données au sein de vos applications de streaming. Pour plus d'informations, consultez le registre AWS Glue Schema (français non garanti). L'un des moyens de configurer cette intégration consiste à utiliser les API PutRecords et PutRecord Kinesis Data Streams disponibles dans le SDK AWS Java.

Pour obtenir des instructions détaillées sur la configuration de l'intégration de Kinesis Data Streams à Schema Registry à l'aide des API PutRecords et PutRecord Kinesis Data Streams, consultez la section « Interaction avec les données à l'aide des API Kinesis Data Streams » dans Cas d'utilisation : Intégrer Amazon Kinesis Data Streams avec le registre AWS Glue Schema.