Développement d'applications producteur à l'aide de l'API Amazon Kinesis Data Streams avec leAWS SDK for Java - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Développement d'applications producteur à l'aide de l'API Amazon Kinesis Data Streams avec leAWS SDK for Java

Vous pouvez développer des applications producteur à l'aide de l'API Amazon Kinesis Data Streams avec le kitAWSSDK for Java. Si vous utilisez Kinesis Data Streams, commencez par vous familiariser avec les concepts et la terminologie présentés dansPrésentation de Amazon Kinesis Data StreamsandPrésentation des Amazon Kinesis Data Streams.

Ces exemples traitent de laAPI Kinesis Data Streamset utilisez le kitAWSKit de développement logiciel pour Javapour ajouter (placer) des données à un flux. Cependant, dans la plupart des cas d'utilisation, vous préférerez sans doute la bibliothèque KPL Kinesis Data Streams. Pour plus d'informations, consultez Développement d'applications producteur à l'aide de la bibliothèque producteur Amazon Kinesis.

L'exemple de code Java présenté dans ce chapitre montre comment effectuer les opérations de base de l'API Kinesis Data Streams et est divisé logiquement par type d'opération. Ces exemples ne représentent pas du code prêt à la production, car ils ne recherchent pas toutes les exceptions possibles ou ne tiennent pas compte de toutes les considérations possibles en matière de sécurité ou de performances. En outre, vous pouvez appeler le kitAPI Kinesis Data Streamsd'autres langages de programmation. Pour de plus amples informations sur toutes les applications disponiblesAWSKits SDK, consultezDébut du développement logiciel avec Amazon Web Services.

Chaque tâche a des conditions préalables ; par exemple, vous ne pouvez pas ajouter de données à un flux tant que vous n'avez pas créé de flux, ce qui demande de créer un client. Pour plus d'informations, consultez Création et gestion de flux.

Ajout de données à un flux

Une fois qu'un flux est créé, vous pouvez y ajouter des données sous forme d'enregistrements. Un enregistrement est une structure de données qui contient les données à traiter sous la forme d'un blob de données. Une fois que vous avez stocké les données dans l'enregistrement, Kinesis Data Streams n'inspecte pas, n'interprète pas ou ne modifie aucunement les données. Chaque enregistrement a également un numéro de séquence et une clé de partition qui lui sont associés.

L'API Kinesis Data Streams comporte deux opérations différentes qui ajoutent des données à un flux,PutRecordsandPutRecord. L'opération PutRecords envoie plusieurs enregistrements à votre flux par demande HTTP et l'opération PutRecord envoie des enregistrements à votre flux un à la fois (une demande HTTP distincte est nécessaire pour chaque enregistrement). Vous préférerez sans doute utiliser PutRecords pour la plupart des applications, car cette opération permet d'atteindre un débit supérieur par application producteur. Pour plus d'informations sur chacune de ces opérations, consultez les sous-sections distinctes ci-dessous.

Etant donné que votre application source ajoute des données au flux à l'aide de l'API Kinesis Data Streams, n'oubliez jamais qu'une ou plusieurs applications consommateur traitent très probablement simultanément des données provenant du flux. Pour plus d'informations sur la façon dont les applications consommateur obtiennent les données à l'aide de l'API Kinesis Data Streams, consultezExtraction des données d'un flux.

Ajout de plusieurs enregistrements avec PutRecords

La .PutRecordsenvoie plusieurs enregistrements à Kinesis Data Streams dans une seule demande. En utilisantPutRecords, les applications producteur peuvent atteindre un débit supérieur lors de l'envoi de données à leur flux de données Kinesis. EACHPutRecordspeut prendre en charge jusqu'à 500 enregistrements. Chaque enregistrement de la demande peut atteindre 1 Mo, jusqu'à une limite de 5 Mo pour l'ensemble de la demande, y compris les clés de partition. Comme avec le singlePutRecordopération décrite ci-dessous,PutRecordsutilise des numéros de séquence et des clés de partition. Toutefois, le paramètre PutRecord SequenceNumberForOrdering n'est pas inclus dans un appel PutRecords. L'opération PutRecords tente de traiter tous les enregistrements dans l'ordre naturel de la demande.

Chaque enregistrement de données a un numéro de séquence unique. Le numéro de séquence est attribué par Kinesis Data Streams une fois que vous avez appeléclient.putRecordspour ajouter les enregistrements de données au flux. Les numéros de séquence correspondant à une même clé de partition deviennent généralement de plus en plus longs au fil du temps ; plus l'intervalle de temps entre chaque demande PutRecords est élevé, plus les numéros de séquence sont longs.

Note

Les numéros de séquence ne peuvent pas servir d'index aux ensembles de données d'un même flux. Pour séparer logiquement les ensembles de données, utilisez des clés de partition ou créez un flux distinct pour chaque ensemble de données.

Une demande PutRecords peut inclure des enregistrements ayant différentes clés de partition. La portée de la demande est un flux ; chaque demande peut inclure une combinaison de clés de partition et d'enregistrements allant jusqu'aux limites définies pour la demande. Les demandes effectuées avec de nombreuses clés de partition différentes pour des flux comportant de nombreuses partitions différentes sont généralement plus rapides que les demandes comportant un petit nombre de clés de partition pour un petit nombre de partitions. Le nombre de clés de partition doit être beaucoup plus grand que le nombre de partitions pour réduire la latence et optimiser le débit.

Exemple d'opération PutRecords

Le code suivant crée 100 enregistrements de données avec des clés de partition séquentielles et les place dans un flux appelé DataStream.

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis kinesisClient = clientBuilder.build(); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult);

La réponse PutRecords comprend un tableau de réponse Records. Chaque enregistrement compris dans ce tableau de réponse correspond directement à un enregistrement dans le tableau de demande. Ces entrées sont classées dans l'ordre naturel, soit de haut en bas de la demande et de la réponse. Le tableau de réponse Records comprend toujours le même nombre d'enregistrements que le tableau de demande.

Gestion des défaillances lors de l'utilisation de PutRecords

Par défaut, la défaillance d'enregistrements individuels dans une demande n'arrête pas le traitement des enregistrements suivants dans une demande PutRecords. Cela signifie qu'un tableau Records de réponse comprend à la fois des enregistrements traités avec succès et sans succès. Vous devez détecter les enregistrements traités sans succès et les inclure dans un appel ultérieur.

Les enregistrements qui ont réussi incluent les valeurs SequenceNumber et ShardID. Ceux qui ont échoué incluent les valeurs ErrorCode et ErrorMessage. Le paramètre ErrorCode reflète le type d'erreur et peut avoir une des valeurs suivantes : ProvisionedThroughputExceededException ou InternalFailure. ErrorMessage fournit des informations plus détaillées sur l'exception ProvisionedThroughputExceededException, y compris l'ID de compte, le nom du flux et les ID de partition de l'enregistrement qui a été limité. L'exemple ci-dessous contient trois enregistrements dans une demande PutRecords. Le second enregistrement échoue et est reflété dans la réponse.

Exemple Syntaxe de la demande PutRecords

{ "Records": [ { "Data": "XzxkYXRhPl8w", "PartitionKey": "partitionKey1" }, { "Data": "AbceddeRFfg12asd", "PartitionKey": "partitionKey1" }, { "Data": "KFpcd98*7nd1", "PartitionKey": "partitionKey3" } ], "StreamName": "myStream" }

Exemple Syntaxe de la réponse PutRecords

{ "FailedRecordCount”: 1, "Records": [ { "SequenceNumber": "21269319989900637946712965403778482371", "ShardId": "shardId-000000000001" }, { “ErrorCode":”ProvisionedThroughputExceededException”, “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111." }, { "SequenceNumber": "21269319989999637946712965403778482985", "ShardId": "shardId-000000000002" } ] }

Les enregistrements qui ont été traités sans succès peuvent être inclus dans des demandes PutRecords ultérieures. Tout d'abord, vérifiez le paramètre FailedRecordCount de putRecordsResult afin de savoir si la demande comporte des enregistrements d'échecs. Dans ce cas, chaque putRecordsEntry comportant un ErrorCode qui n'est pas null doit être ajouté à une demande ultérieure. Pour un exemple de ce type de gestionnaire, reportez-vous au code suivant.

Exemple Gestionnaire de défaillance PutRecords

PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(myStreamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int j = 0; j < 100; j++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); while (putRecordsResult.getFailedRecordCount() > 0) { final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords(); for (int i = 0; i < putRecordsResultEntryList.size(); i++) { final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i); final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i); if (putRecordsResultEntry.getErrorCode() != null) { failedRecordsList.add(putRecordRequestEntry); } } putRecordsRequestEntryList = failedRecordsList; putRecordsRequest.setRecords(putRecordsRequestEntryList); putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); }

Ajout d'un seul enregistrement avec PutRecord

Chaque appel de PutRecord est exécuté sur un seul enregistrement. Préférez l'opération PutRecords décrite dans Ajout de plusieurs enregistrements avec PutRecords, sauf si votre application a particulièrement besoin de toujours envoyer des enregistrements uniques par demande ou qu'une autre raison empêche l'utilisation de PutRecords.

Chaque enregistrement de données a un numéro de séquence unique. Le numéro de séquence est attribué par Kinesis Data Streams une fois que vous avez appeléclient.putRecordpour ajouter l'enregistrement de données au flux. Les numéros de séquence correspondant à une même clé de partition deviennent généralement de plus en plus longs au fil du temps ; plus l'intervalle de temps entre chaque demande PutRecord est élevé, plus les numéros de séquence sont longs.

Lorsque des opérations put se succèdent rapidement, il n'est pas sûr que les numéros de séquence renvoyés augmentent, car ces opérations semblent surtout simultanées pour Kinesis Data Streams. Pour garantir une stricte augmentation des numéros de séquence pour la même clé de partition, utilisez le paramètre SequenceNumberForOrdering, comme il est illustré dans l'exemple de code Exemple d'opération PutRecord.

Que vous utilisiez ou nonSequenceNumberForOrdering, enregistre que Kinesis Data Streams reçoit via unGetRecordssont strictement classés par numéro de séquence.

Note

Les numéros de séquence ne peuvent pas servir d'index aux ensembles de données d'un même flux. Pour séparer logiquement les ensembles de données, utilisez des clés de partition ou créez un flux distinct pour chaque ensemble de données.

La clé de partition sert à regrouper les données dans le flux. Un enregistrement de données est attribué à une partition du flux suivant sa clé de partition. Plus précisément, Kinesis Data Streams utilise la clé de partition comme entrée d'une fonction de hachage qui mappe la clé de partition (et les données associées) à une partition spécifique.

Ce mécanisme de hachage a pour effet que tous les enregistrements de données ayant la même clé de partition sont mappés à la même partition du flux. Toutefois, si le nombre de clés de partition dépasse le nombre de partitions, certaines partitions contiennent nécessairement des enregistrements ayant des clés de partition différentes. Du point de vue de la conception, afin de garantir que toutes vos partitions sont bien utilisées, le nombre de partitions (spécifié par la méthode setShardCount de CreateStreamRequest) doit être nettement inférieur à celui des partitions uniques, et la quantité de données qui passe dans une clé de partition unique doit être nettement inférieure à la capacité de la partition.

Exemple d'opération PutRecord

Le code suivant crée dix enregistrements de données répartis entre deux clés de partition et les place dans un flux appelé myStreamName.

for (int j = 0; j < 10; j++) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( myStreamName ); putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() )); putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); PutRecordResult putRecordResult = client.putRecord( putRecordRequest ); sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber(); }

L'exemple de code précédent utilise le paramètre setSequenceNumberForOrdering pour garantir un ordre croissant dans chaque clé de partition. Pour utiliser ce paramètre efficacement, définissez le SequenceNumberForOrdering de l'enregistrement en cours (enregistrement n) sur le numéro de séquence de l'enregistrement précédent (enregistrement n-1). Pour obtenir le numéro de séquence d'un enregistrement qui a été ajouté au flux, appelez getSequenceNumber sur le résultat de putRecord.

Le paramètre SequenceNumberForOrdering garantit des numéros de séquence strictement croissants pour la même clé de partition. SequenceNumberForOrdering ne propose pas l'ordre des enregistrements sur plusieurs clés de partition.

Interagir avec les données à l'aide duAWSGlue de schémas

Vous pouvez intégrer vos flux de données Kinesis avec leAWSGlue registre de schéma. La .AWSLe registre de schéma Glue vous permet de découvrir, de contrôler et d'évoluer de manière centralisée les schémas, tout en veillant à ce que les données produites soient validées en continu par un schéma enregistré. Un schéma définit la structure et le format d'un enregistrement de données. Un schéma est une spécification versionnée pour la publication, la consommation ou le stockage des données fiables. La .AWSGlue Schema Registry vous permet d'améliorer la qualité des données de bout en bout et la gouvernance des données au sein de vos applications de streaming. Pour de plus amples informations, veuillez consulterAWSGlue de schémas. L'une des façons de mettre en place cette intégration est par le biais dePutRecordsandPutRecordLes API Kinesis Data Streams sont disponibles dans leAWSKit SDK Java.

Pour obtenir des instructions détaillées sur la façon de configurer l'intégration des Kinesis Data Streams avec le Registre de schéma à l'aide des API PutRecords et PutRecord Kinesis Data Streams, consultez la section « Interagir avec les données à l'aide des API Kinesis Data Streams » dansCas d'utilisation des applications Intégration d'Amazon Kinesis Data Streams avec leAWSGlue de schémas.