Dégroupement côté consommateur - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Dégroupement côté consommateur

À partir de la version 1.4.0, la KCL prend en charge le dégroupement automatique des enregistrements utilisateur KPL. Le code de l'application consommateur écrit avec les versions antérieures de la KCL sera compilé sans aucune modification lorsque vous aurez mis à jour la KCL. Toutefois, si la fonctionnalité de regroupement KPL est utilisée côté producteur, le contrôle comporte une subtilité : tous les sous-enregistrements figurant dans un enregistrement ont le même numéro de séquence si bien que des données supplémentaires doivent être stockées avec le point de contrôle si vous avez besoin de différencier les sous-enregistrements. Ces données supplémentaires s'appellent le numéro de sous-séquence.

Migration à partir des versions antérieures de la bibliothèque KCL

Vous n'avez pas besoin de modifier vos appels existants pour effectuer le contrôle associé au regroupement. L'extraction de tous les enregistrements stockés avec succès dans Kinesis Data Streams demeure garantie. La bibliothèque KCL fournit désormais deux nouvelles opérations de contrôle pour prendre en charge les cas d'utilisation spéciaux décrits ci-dessous.

Si votre code existant a été écrit pour la KCL avant la prise en charge de la KPL et que votre opération de contrôle est appelée sans argument, cela équivaut à contrôler le numéro de séquence du dernier enregistrement utilisateur KPL figurant dans le lot. Si votre opération de contrôle est appelée avec une chaîne de numéro de séquence, cela équivaut à contrôler le numéro de séquence donné du lot ainsi que le numéro de sous-séquence implicite 0 (zéro).

L'appel de la nouvelle opération de contrôle KCL checkpoint() sans argument est équivalent sémantiquement au contrôle du numéro de séquence du dernier appel Record du lot, accompagné du numéro de sous-séquence implicite 0 (zéro).

L'appel de la nouvelle opération de contrôle KCL checkpoint(Record record) est équivalent sémantiquement au contrôle du numéro de séquence donné du Record, accompagné du numéro de sous-séquence implicite 0 (zéro). Si l'appel Record est en fait un UserRecord, le numéro de séquence et le numéro de sous-séquence UserRecord sont contrôlés.

L'appel de la nouvelle opération de contrôle KCL checkpoint(String sequenceNumber, long subSequenceNumber) contrôle explicitement le numéro de séquence donné ainsi que le numéro de sous-séquence donné.

Dans chacun de ces cas, une fois que le point de contrôle est stocké dans la table de point de contrôle Amazon DynamoDB, la KCL peut correctement reprendre l'extraction des enregistrements même lorsque l'application se bloque et redémarre. Si la séquence contient plusieurs enregistrements, l'extraction commence par l'enregistrement ayant le numéro de sous-séquence suivant dans l'enregistrement ayant le numéro de séquence contrôlé le plus récemment. Si le dernier contrôle incluait le tout dernier numéro de sous-séquence de l'enregistrement ayant le numéro de séquence précédent, l'extraction commence par l'enregistrement avec le numéro de séquence suivant.

La section suivante décrit en détail le contrôle de séquence et de sous-séquence pour les consommateurs qui doivent éviter de sauter ou de dupliquer des enregistrements. S'il importe peu que des enregistrements soient sautés (ou dupliqués) lors de l'arrêt ou du redémarrage du traitement des enregistrements du consommateur, vous pouvez exécuter votre code existant sans modification.

Extensions KCL pour dégroupement KPL

Comme déjà mentionné, le regroupement KPL peut comprendre un contrôle de sous-séquence. Pour faciliter l'utilisation du contrôle de sous-séquence, une classe UserRecord a été ajoutée à la KCL :

public class UserRecord extends Record { public long getSubSequenceNumber() { /* ... */ } @Override public int hashCode() { /* contract-satisfying implementation */ } @Override public boolean equals(Object obj) { /* contract-satisfying implementation */ } }

Cette classe est maintenant utilisée à la place de Record. Elle ne casse pas le code existant, car c'est une sous-classe de Record. La classe UserRecord représente à la fois les sous-enregistrements réels et les enregistrements non regroupés standard. Les enregistrements non regroupés peuvent être considérés comme des enregistrements regroupés contenant un seul sous-enregistrement.

En outre, deux nouvelles opérations ont été ajoutées à IRecordProcessorCheckpointer :

public void checkpoint(Record record); public void checkpoint(String sequenceNumber, long subSequenceNumber);

Pour commencer à utiliser le contrôle du numéro de sous-séquence, vous pouvez effectuer la conversion ci-dessous. Modifiez le code de formulaire suivant :

checkpointer.checkpoint(record.getSequenceNumber());

Nouveau code de formulaire :

checkpointer.checkpoint(record);

Nous vous recommandons d'utiliser le formulaire checkpoint(Record record) pour le contrôle de sous-séquence. Toutefois, si vous stockez déjà sequenceNumbers dans les chaînes à utiliser pour les points de contrôle, vous devez désormais stocker aussi subSequenceNumber, comme l'illustre l'exemple suivant :

String sequenceNumber = record.getSequenceNumber(); long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber(); // ... do other processing checkpointer.checkpoint(sequenceNumber, subSequenceNumber);

Le passage de Record à UserRecord réussit toujours, car l'implémentation utilise toujours UserRecord simultanément. À moins qu'il ne soit nécessaire d'effectuer des opérations arithmétiques sur les numéros de séquence, cette approche n'est pas recommandée.

Lors du traitement des enregistrements utilisateur KPL, la KCL écrit le numéro de sous-séquence dans Amazon DynamoDB sous la forme d'un champ supplémentaire pour chaque ligne. Les versions antérieures de la KCL utilisaient AFTER_SEQUENCE_NUMBER pour extraire les enregistrements lors de la reprise des points de contrôle. La KCL actuelle avec prise en charge KPL utilise AT_SEQUENCE_NUMBER à la place. Lorsque l'enregistrement situé au numéro de séquence contrôlé est extrait, le numéro de sous-séquence contrôlé est vérifié, et les sous-enregistrements sont abandonnés le cas échéant (ce peut être tous si le dernier sous-enregistrement est celui qui est contrôlé). Encore, les enregistrements non regroupés peuvent être considérés comme des enregistrements regroupés contenant un seul sous-enregistrement, si bien que le même algorithme fonctionne aussi bien pour les enregistrements regroupés que pour les enregistrements non regroupés.

En utilisant GetRecords directement

Vous pouvez également choisir de ne pas utiliser la KCL, mais d'invoquer directement l'opération d'API GetRecords pour extraire les enregistrements Kinesis Data Streams. Pour décompresser ces enregistrements dans vos enregistrements utilisateur KPL initiaux, appelez l'une des opérations statiques suivantes figurant dans UserRecord.java :

public static List<Record> deaggregate(List<Record> records) public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)

La première opération utilise la valeur par défaut 0 (zéro) pour startingHashKey et la valeur par défaut 2^128 -1 pour endingHashKey.

Chacune de ces opérations dégroupe la liste des enregistrements Kinesis Data Streams dans une liste d'enregistrements utilisateur KPL. Tout enregistrement utilisateur KPL dont la clé de hachage ou la clé de partition explicite n'est pas comprise dans la plage de startingHashKey (inclus) et de endingHashKey (inclus) est supprimé de la liste d'enregistrements renvoyée.