Après mûre réflexion, nous avons décidé de mettre fin à Amazon Kinesis Data Analytics pour les applications SQL en deux étapes :
1. À compter du 15 octobre 2025, vous ne pourrez plus créer de nouvelles applications Kinesis Data Analytics for SQL.
2. Nous supprimerons vos candidatures à compter du 27 janvier 2026. Vous ne pourrez ni démarrer ni utiliser vos applications Amazon Kinesis Data Analytics for SQL. Support ne sera plus disponible pour Amazon Kinesis Data Analytics for SQL à partir de cette date. Pour de plus amples informations, veuillez consulter Arrêt d'Amazon Kinesis Data Analytics pour les applications SQL.
Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Exemple : Fractionnement de chaînes en plusieurs champs (fonction VARIABLE_COLUMN_LOG_PARSE)
Cet exemple utilise la fonction VARIABLE_COLUMN_LOG_PARSE
pour manipuler des chaînes dans Kinesis Data Analytics. VARIABLE_COLUMN_LOG_PARSE
fractionne une chaîne d’entrée en champs séparés par un caractère délimiteur ou une chaîne de délimiteur. Pour plus d’informations, consultez la section VARIABLE_COLUMN_LOG_PARSE dans le manuel Référence SQL du service géré Amazon pour Apache Flink.
Dans cet exemple, vous écrivez des enregistrements semi-structurés dans un flux de données Amazon Kinesis. Les exemples d'enregistrements sont les suivants :
{ "Col_A" : "string", "Col_B" : "string", "Col_C" : "string", "Col_D_Unstructured" : "value,value,value,value"} { "Col_A" : "string", "Col_B" : "string", "Col_C" : "string", "Col_D_Unstructured" : "value,value,value,value"}
Vous créez ensuite une application Kinesis Data Analytics dans la console, à l’aide du flux Kinesis comme source de streaming. Le processus de découverte lit les exemples d'enregistrements de la source de diffusion et déduit un schéma intégré à l'application avec quatre colonnes, comme illustré ci-après :

Vous utilisez ensuite le code d'application avec la fonction VARIABLE_COLUMN_LOG_PARSE
pour analyser les valeurs séparées par des virgules et insérer des lignes normalisées dans un autre flux intégré à l'application, comme illustré ci-après :

Rubriques
Étape 1 : Création d’un flux de données Kinesis
Créez un flux de données Amazon Kinesis et remplissez les enregistrements de journaux comme suit :
-
Choisissez Data Streams (Flux de données) dans le volet de navigation.
-
Choisissez Create Kinesis stream (Créer un flux Kinesis), puis créez un flux avec une seule partition. Pour de plus amples informations, consultez Créer un flux dans le Guide du développeur Amazon Kinesis Data Streams.
-
Exécutez le code Python suivant pour remplir les exemples d'enregistrements de journal. Ce code simple écrit en continu le même enregistrement de journal dans le flux.
import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return {"Col_A": "a", "Col_B": "b", "Col_C": "c", "Col_E_Unstructured": "x,y,z"} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
Étape 2 : Création d’une application Kinesis Data Analytics
Créez une application Kinesis Data Analytics comme suit :
Ouvrez le service géré pour la console Apache Flink à l'adresse https://console.aws.amazon.com/kinesisanalytics.
-
Choisissez Create application (Créer une application), saisissez un nom d'application, puis sélectionnez Create application (Créer une application).
-
Sur la page de détails de l'application, choisissez Connect streaming data (Connecter des données de diffusion).
-
Sur la page Connect to source (Se connecter à la source), procédez comme suit :
-
Choisissez le flux que vous avez créé dans la section précédente.
-
Choisissez l'option de création d'un rôle IAM.
-
Choisissez Discover schema (Découvrir le schéma). Attendez que la console affiche le schéma déduit et les exemples d'enregistrements utilisés pour déduire le schéma pour le flux intégré à l'application créé. Notez que le schéma déduit ne comporte qu'une seule colonne.
-
Choisissez Save and continue (Enregistrer et continuer).
-
-
Sur la page de détails de l'application, choisissez Go to SQL editor (Accéder à l'éditeur SQL). Pour lancer l'application, choisissez Yes, start application (Oui, démarrer l'application) dans la boîte de dialogue qui s'affiche.
-
Dans l'éditeur SQL, écrivez le code de l'application, puis vérifiez les résultats :
-
Copiez le code d'application suivant et collez-le dans l'éditeur:
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM"( "column_A" VARCHAR(16), "column_B" VARCHAR(16), "column_C" VARCHAR(16), "COL_1" VARCHAR(16), "COL_2" VARCHAR(16), "COL_3" VARCHAR(16)); CREATE OR REPLACE PUMP "SECOND_STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM t."Col_A", t."Col_B", t."Col_C", t.r."COL_1", t.r."COL_2", t.r."COL_3" FROM (SELECT STREAM "Col_A", "Col_B", "Col_C", VARIABLE_COLUMN_LOG_PARSE ("Col_E_Unstructured", 'COL_1 TYPE VARCHAR(16), COL_2 TYPE VARCHAR(16), COL_3 TYPE VARCHAR(16)', ',') AS r FROM "SOURCE_SQL_STREAM_001") as t;
-
Choisissez Save and run SQL (Enregistrer et exécuter SQL). Dans l'onglet Real-time analytics (Analyse en temps réel), vous pouvez voir tous les flux intégrés à l'application que l'application a créés et vérifier les données.
-