Mettre en œuvre la désagrégation des consommateurs - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Mettre en œuvre la désagrégation des consommateurs

À partir de la version 1.4.0, la désagrégation automatique des enregistrements KPL utilisateur est prise KCL en charge. Le code d'application grand public écrit avec les versions précédentes du KCL sera compilé sans aucune modification après la mise à jour duKCL. Toutefois, si KPL l'agrégation est utilisée du côté du producteur, il existe une subtilité liée au point de contrôle : tous les sous-enregistrements d'un enregistrement agrégé ont le même numéro de séquence. Des données supplémentaires doivent donc être stockées avec le point de contrôle si vous devez faire la distinction entre les sous-enregistrements. Ces données supplémentaires s'appellent le numéro de sous-séquence.

Migrer à partir des versions précédentes du KCL

Vous n'avez pas besoin de modifier vos appels existants pour effectuer le contrôle associé au regroupement. L'extraction de tous les enregistrements stockés avec succès dans Kinesis Data Streams demeure garantie. KCLIl propose désormais deux nouvelles opérations de point de contrôle pour prendre en charge des cas d'utilisation particuliers, décrits ci-dessous.

Si votre code existant a été écrit pour le KPL support KCL antérieur et que votre opération de point de contrôle est appelée sans arguments, cela revient à vérifier le numéro de séquence du dernier enregistrement KPL utilisateur du lot. Si votre opération de contrôle est appelée avec une chaîne de numéro de séquence, cela équivaut à contrôler le numéro de séquence donné du lot ainsi que le numéro de sous-séquence implicite 0 (zéro).

L'appel de la nouvelle opération de KCL point de contrôle checkpoint() sans aucun argument équivaut sémantiquement à vérifier le numéro de séquence du dernier Record appel du lot, ainsi que le numéro de sous-séquence implicite 0 (zéro).

L'appel de la nouvelle opération de KCL point de contrôle checkpoint(Record record) équivaut sémantiquement à vérifier le numéro de séquence Record du point donné avec le numéro de sous-séquence implicite 0 (zéro). Si l'appel Record est en fait un UserRecord, le numéro de séquence et le numéro de sous-séquence UserRecord sont contrôlés.

L'appel de la nouvelle opération de KCL point de contrôle vérifie checkpoint(String sequenceNumber, long subSequenceNumber) explicitement le numéro de séquence donné ainsi que le numéro de sous-séquence donné.

Dans tous les cas, une fois le point de contrôle enregistré dans la table des points de contrôle Amazon DynamoDB, la récupération des enregistrements peut reprendre correctementKCL, même lorsque l'application se bloque et redémarre. Si la séquence contient plusieurs enregistrements, l'extraction commence par l'enregistrement ayant le numéro de sous-séquence suivant dans l'enregistrement ayant le numéro de séquence contrôlé le plus récemment. Si le dernier contrôle incluait le tout dernier numéro de sous-séquence de l'enregistrement ayant le numéro de séquence précédent, l'extraction commence par l'enregistrement avec le numéro de séquence suivant.

La section suivante décrit en détail le contrôle de séquence et de sous-séquence pour les consommateurs qui doivent éviter de sauter ou de dupliquer des enregistrements. S'il importe peu que des enregistrements soient sautés (ou dupliqués) lors de l'arrêt ou du redémarrage du traitement des enregistrements du consommateur, vous pouvez exécuter votre code existant sans modification.

Utiliser des KCL extensions pour la KPL désagrégation

KPLla désagrégation peut impliquer un point de contrôle des sous-séquences. Pour faciliter l'utilisation du point de contrôle de sous-séquence, une UserRecord classe a été ajoutée au : KCL

public class UserRecord extends Record { public long getSubSequenceNumber() { /* ... */ } @Override public int hashCode() { /* contract-satisfying implementation */ } @Override public boolean equals(Object obj) { /* contract-satisfying implementation */ } }

Cette classe est maintenant utilisée à la place de Record. Elle ne casse pas le code existant, car c'est une sous-classe de Record. La classe UserRecord représente à la fois les sous-enregistrements réels et les enregistrements non regroupés standard. Les enregistrements non regroupés peuvent être considérés comme des enregistrements regroupés contenant un seul sous-enregistrement.

En outre, deux nouvelles opérations ont été ajoutées à IRecordProcessorCheckpointer :

public void checkpoint(Record record); public void checkpoint(String sequenceNumber, long subSequenceNumber);

Pour commencer à utiliser le contrôle du numéro de sous-séquence, vous pouvez effectuer la conversion ci-dessous. Modifiez le code de formulaire suivant :

checkpointer.checkpoint(record.getSequenceNumber());

Nouveau code de formulaire :

checkpointer.checkpoint(record);

Nous vous recommandons d'utiliser le formulaire checkpoint(Record record) pour le contrôle de sous-séquence. Toutefois, si vous stockez déjà sequenceNumbers dans les chaînes à utiliser pour les points de contrôle, vous devez désormais stocker aussi subSequenceNumber, comme l'illustre l'exemple suivant :

String sequenceNumber = record.getSequenceNumber(); long subSequenceNumber = ((UserRecord) record).getSubSequenceNumber(); // ... do other processing checkpointer.checkpoint(sequenceNumber, subSequenceNumber);

Le cast from Record to réussit UserRecord toujours car l'implémentation utilise UserRecord toujours. À moins qu'il ne soit nécessaire d'effectuer des opérations arithmétiques sur les numéros de séquence, cette approche n'est pas recommandée.

Lors du traitement des enregistrements KPL utilisateur, le numéro de sous-séquence KCL écrit le numéro de sous-séquence dans Amazon DynamoDB sous forme de champ supplémentaire pour chaque ligne. Les versions précédentes du étaient KCL utilisées AFTER_SEQUENCE_NUMBER pour récupérer des enregistrements lors de la reprise des points de contrôle. Le courant KCL avec KPL support utilise à la AT_SEQUENCE_NUMBER place. Lorsque l'enregistrement situé au numéro de séquence contrôlé est extrait, le numéro de sous-séquence contrôlé est vérifié, et les sous-enregistrements sont abandonnés le cas échéant (ce peut être tous si le dernier sous-enregistrement est celui qui est contrôlé). Encore, les enregistrements non regroupés peuvent être considérés comme des enregistrements regroupés contenant un seul sous-enregistrement, si bien que le même algorithme fonctionne aussi bien pour les enregistrements regroupés que pour les enregistrements non regroupés.

Utiliser GetRecords directement

Vous pouvez également choisir de ne pas utiliser l'APIopération KCL mais de l'invoquer GetRecords directement pour récupérer les enregistrements Kinesis Data Streams. Pour décompresser ces enregistrements récupérés dans vos dossiers KPL utilisateur d'origine, effectuez l'une des opérations statiques suivantes dans UserRecord.java :

public static List<Record> deaggregate(List<Record> records) public static List<UserRecord> deaggregate(List<UserRecord> records, BigInteger startingHashKey, BigInteger endingHashKey)

La première opération utilise la valeur par défaut 0 (zéro) pour startingHashKey et la valeur par défaut 2^128 -1 pour endingHashKey.

Chacune de ces opérations désagrège la liste donnée d'enregistrements Kinesis Data Streams en une liste d'enregistrements utilisateur. KPL Tous les enregistrements KPL utilisateur dont la clé de hachage ou de partition explicite se situe en dehors de la plage comprise entre startingHashKey (inclus) et endingHashKey (inclus) sont supprimés de la liste d'enregistrements renvoyée.