Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Utilisation des opérations de streaming dans le cadre de sessions AWS Glue interactives
Changement de type de séance de streaming
Utilisez magic de configuration des séances interactives AWS Glue et %streaming
pour définir la tâche que vous exécutez et initialisez une séance interactive de streaming.
Échantillonnage du flux d'entrée pour un développement interactif
L'un des outils que nous avons développés pour améliorer l'expérience AWS Glue interactive dans les sessions interactives est l'ajout d'une nouvelle méthode ci-dessous GlueContext
pour obtenir un instantané d'un flux dans un fichier statique DynamicFrame. GlueContext
vous permet d'inspecter, d'interagir et de mettre en œuvre votre flux de travail.
Avec l’instance de classe GlueContext
, vous serez en mesure de localiser la méthode getSampleStreamingDynamicFrame
. Les arguments requis pour cette méthode sont les suivants :
-
dataFrame
: The Spark Streaming DataFrame -
options
: voir les options disponibles ci-dessous
Les options disponibles sont les suivantes :
-
windowSize : ceci est également appelé Durée du micro-lot. Ce paramètre déterminera la durée d'attente d'une requête en streaming après le déclenchement du lot précédent. La valeur de ce paramètre doit être inférieure à
pollingTimeInMs
. -
pollingTimeInMme : La durée totale pendant laquelle la méthode sera exécutée. Il déclenche au moins un micro-lot pour obtenir des échantillons de registre à partir du flux d'entrée.
-
recordPollingLimit: Ce paramètre vous permet de limiter le nombre total d'enregistrements que vous allez interroger à partir du flux.
-
(Facultatif) Vous pouvez également utiliser
writeStreamFunction
pour appliquer cette fonction personnalisée à chaque fonction d'échantillonnage de registre. Voir ci-dessous des exemples dans Scala et Python.
Note
Lorsque le paramètre DynFrame
échantillonné est vide, cela peut être causé par certaines des raisons suivantes :
-
La source Streaming est définie sur « Dernier » et aucune nouvelle donnée n'a été ingérée pendant la période d'échantillonnage.
-
Le temps d'interrogation n'est pas suffisant pour traiter les registres qu'il a ingérés. Les données ne s'afficheront pas à moins que l'ensemble du lot n'ait été traité.
Exécution d'applications de streaming dans des séances interactives
Dans les séances interactives AWS Glue, vous pouvez exécuter l'application de streaming AWS Glue comme comment créer une application de streaming dans la console AWS Glue. Étant donné que les séances interactives sont basées sur une séance, la présence d'exceptions dans le moteur d'exécution ne provoque pas l'arrêt de la séance. Nous avons maintenant l'avantage supplémentaire de développer votre fonction par lots de manière itérative. Par exemple :
def batch_function(data_frame, batch_id): log.info(data_frame.count()) invalid_method_call() glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})
Dans l'exemple ci-dessus, nous avons inclus une utilisation non valide d'une méthode et contrairement aux tâches AWS Glue habituelles qui quitteront l'application entière, le contexte de codage et les définitions de l'utilisateur sont entièrement préservés, et la séance est toujours opérationnelle. Il n'est pas nécessaire d'amorcer un nouveau cluster et de ré-exécuter toute la transformation précédente. Cela vous autorise à vous concentrer sur l'itération rapide de vos implémentations de fonctions par lots pour obtenir des résultats souhaitables.
Il est important de noter que la séance interactive évalue chaque instruction de manière bloquante afin que la séance n'exécute qu'une seule instruction à la fois. Étant donné que les requêtes de streaming sont continues et ne se terminent jamais, les séances avec des requêtes de streaming actives ne seront pas en mesure de gérer les instructions de suivi à moins qu'elles ne soient interrompues. Vous pouvez émettre la commande d'interruption directement à partir de Bloc-notes Jupyter et notre noyau s'occupera de l'annulation pour vous.
Prenons l'exemple de la séquence suivante d'instructions qui attendent d'être exécutées :
Statement 1: val number = df.count() #Spark Action with deterministic result Result: 5 Statement 2: streamingQuery.start().awaitTermination() #Spark Streaming Query that will be executing continously Result: Constantly updated with each microbatch Statement 3: val number2 = df.count() #This will not be executed as previous statement will be running indefinitely