Dégroupement côté consommateur - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Dégroupement côté consommateur

À partir de la version 1.4.0, la KCL prend en charge le regroupement automatique des enregistrements utilisateur KPL. Le code de l'application consommateur écrit avec les versions antérieures de la bibliothèque KCL sera compilé sans aucune modification lorsque vous aurez mis à jour la bibliothèque KCL. Toutefois, si la fonctionnalité de regroupement KPL est utilisée côté producteur, le contrôle comporte une subtilité : tous les sous-enregistrements figurant dans un enregistrement ont le même numéro de séquence si bien que des données supplémentaires doivent être stockées avec le point de contrôle si vous avez besoin de différencier les sous-enregistrements. Ces données supplémentaires s'appellent le numéro de sous-séquence.

Migration à partir des versions antérieures de la bibliothèque KCL

Vous n'avez pas besoin de modifier vos appels existants pour effectuer le contrôle associé au regroupement. L'extraction de tous les enregistrements stockés avec succès dans Kinesis Data Streams demeure garantie. La KCL fournit désormais deux nouvelles opérations de contrôle pour prendre en charge les cas d'utilisation spéciaux décrits ci-dessous.

Si votre code existant a été écrit pour la KCL avant la prise en charge de KPL et que votre opération de contrôle est appelée sans argument, cela équivaut à contrôler le numéro de séquence du dernier enregistrement utilisateur KPL figurant dans le lot. Si votre opération de contrôle est appelée avec une chaîne de numéro de séquence, cela équivaut à contrôler le numéro de séquence donné du lot ainsi que le numéro de sous-séquence implicite 0 (zéro).

Appel de la nouvelle opération de point de contrôle KCLcheckpoint()sans aucun argument est sémantiquement équivalent à la vérification du numéro de séquence du dernierRecordDans le lot, accompagné du numéro de sous-séquence implicite 0 (zéro).

Appel de la nouvelle opération de point de contrôle KCLcheckpoint(Record record)est sémantiquement équivalent à la vérification de laRecordLe numéro de séquence implicite 0 (zéro). Si l'appel Record est en fait un UserRecord, le numéro de séquence et le numéro de sous-séquence UserRecord sont contrôlés.

Appel de la nouvelle opération de point de contrôle KCLcheckpoint(String sequenceNumber, long subSequenceNumber)contrôle explicitement le numéro de séquence donné ainsi que le numéro de sous-séquence donné.

Dans chacun de ces cas, une fois que le point de contrôle est stocké dans la table de point de contrôle Amazon DynamoDB, la KCL peut correctement reprendre l'extraction des enregistrements même lorsque l'application se bloque et redémarre. Si la séquence contient plusieurs enregistrements, l'extraction commence par l'enregistrement ayant le numéro de sous-séquence suivant dans l'enregistrement ayant le numéro de séquence contrôlé le plus récemment. Si le dernier contrôle incluait le tout dernier numéro de sous-séquence de l'enregistrement ayant le numéro de séquence précédent, l'extraction commence par l'enregistrement avec le numéro de séquence suivant.

La section suivante décrit en détail le contrôle de séquence et de sous-séquence pour les consommateurs qui doivent éviter de sauter ou de dupliquer des enregistrements. S'il importe peu que des enregistrements soient sautés (ou dupliqués) lors de l'arrêt ou du redémarrage du traitement des enregistrements du consommateur, vous pouvez exécuter votre code existant sans modification.

Extensions KCL pour dégroupement KPL

Comme déjà mentionné, le regroupement KPL peut comprendre un contrôle de sous-séquence. Pour faciliter l'utilisation du contrôle de sous-séquence, uneUserRecorda été ajouté à KCL :

public class UserRecord extends Record { public long getSubSequenceNumber() { /* ... */ } @Override public int hashCode() { /* contract-satisfying implementation */ } @Override public boolean equals(Object obj) { /* contract-satisfying implementation */ } }

Cette classe est maintenant utilisée à la place de Record. Elle ne casse pas le code existant, car c'est une sous-classe de Record. La classe UserRecord représente à la fois les sous-enregistrements réels et les enregistrements non regroupés standard. Les enregistrements non regroupés peuvent être considérés comme des enregistrements regroupés contenant un seul sous-enregistrement.

En outre, deux nouvelles opérations ont été ajoutées à IRecordProcessorCheckpointer :

public void checkpoint(Record record); public void checkpoint(String sequenceNumber, long subSequenceNumber);

Pour commencer à utiliser le contrôle du numéro de sous-séquence, vous pouvez effectuer la conversion ci-dessous. Modifiez le code de formulaire suivant :

checkpointer.checkpoint(record.getSequenceNumber());

Nouveau code de formulaire :

checkpointer.checkpoint(record);

Nous vous recommandons d'utiliser le formulaire checkpoint(Record record) pour le contrôle de sous-séquence. Toutefois, si vous stockez déjà sequenceNumbers dans les chaînes à utiliser pour les points de contrôle, vous devez désormais stocker aussi subSequenceNumber, comme l'illustre l'exemple suivant :

String sequenceNumber = record.getSequenceNumber(); long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber(); // ... do other processing checkpointer.checkpoint(sequenceNumber, subSequenceNumber);

Le passage de Record à UserRecord réussit toujours, car l'implémentation utilise toujours UserRecord simultanément. À moins qu'il ne soit nécessaire d'effectuer des opérations arithmétiques sur les numéros de séquence, cette approche n'est pas recommandée.

Lors du traitement des enregistrements utilisateur KPL, la KCL écrit le numéro de sous-séquence dans Amazon DynamoDB sous la forme d'un champ supplémentaire pour chaque ligne. Les versions antérieures de KCL utilisées parAFTER_SEQUENCE_NUMBERpour extraire les enregistrements lors de la reprise des points de contrôle. Le KCL actuel avec prise en charge KPL utiliseAT_SEQUENCE_NUMBERÀ la place. Lorsque l'enregistrement situé au numéro de séquence contrôlé est extrait, le numéro de sous-séquence contrôlé est vérifié, et les sous-enregistrements sont abandonnés le cas échéant (ce peut être tous si le dernier sous-enregistrement est celui qui est contrôlé). Encore, les enregistrements non regroupés peuvent être considérés comme des enregistrements regroupés contenant un seul sous-enregistrement, si bien que le même algorithme fonctionne aussi bien pour les enregistrements regroupés que pour les enregistrements non regroupés.

Utilisation directe de GetRecords

Vous pouvez également choisir de ne pas utiliser le KCL mais d'appeler à la place l'opération d'APIGetRecordsdirectement pour récupérer les enregistrements Kinesis Data Streams. Pour décompresser ces enregistrements dans vos enregistrements utilisateur KPL initiaux, appelez l'une des opérations statiques suivantes figurant dansUserRecord.java :

public static List<Record> deaggregate(List<Record> records) public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)

La première opération utilise la valeur par défaut 0 (zéro) pour startingHashKey et la valeur par défaut 2^128 -1 pour endingHashKey.

Chacune de ces opérations dégroupe la liste des enregistrements Kinesis Data Streams dans une liste d'enregistrements utilisateur KPL. Tout enregistrement utilisateur KPL dont la clé de hachage ou la clé de partition explicite n'est pas comprise dans la plage destartingHashKey(inclus) et laendingHashKey(inclusivement) sont ignorés de la liste des enregistrements renvoyés.