Amazon Redshift ne prendra plus en charge la création de nouveaux Python UDFs à compter du 1er novembre 2025. Si vous souhaitez utiliser Python UDFs, créez la version UDFs antérieure à cette date. Le Python existant UDFs continuera à fonctionner normalement. Pour plus d’informations, consultez le billet de blog
Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Ingestion de données en streaming à l’aide de Kinesis
Cette procédure montre comment ingérer des données à partir d’un flux Kinesis nommé ev_station_data, qui contient les données de consommation de différentes bornes de recharge pour véhicules électriques, au format JSON. Le schéma est bien défini. L’exemple montre comment stocker les données sous forme de JSON brut et comment convertir les données JSON en types de données Amazon Redshift lorsqu’elles sont ingérées.
Configuration du producteur
À l’aide d’Amazon Kinesis Data Streams, suivez les étapes pour créer un flux nommé
ev_station_data. Sélectionnez On-demand (À la demande) pour le Capacity mode (Mode de capacité). Pour plus d'informations, consultez la section Création d'un flux via la console AWS de gestion.Amazon Kinesis Data Generator
peut vous aider à générer des données de test à utiliser avec votre flux. Suivez les étapes détaillées dans l’outil pour commencer, et utilisez le modèle de données suivant afin de générer vos données : { "_id" : "{{random.uuid}}", "clusterID": "{{random.number( { "min":1, "max":50 } )}}", "connectionTime": "{{date.now("YYYY-MM-DD HH:mm:ss")}}", "kWhDelivered": "{{commerce.price}}", "stationID": "{{random.number( { "min":1, "max":467 } )}}", "spaceID": "{{random.word}}-{{random.number( { "min":1, "max":20 } )}}", "timezone": "America/Los_Angeles", "userID": "{{random.number( { "min":1000, "max":500000 } )}}" }Chaque objet JSON des données du flux comprend les propriétés suivantes :
{ "_id": "12084f2f-fc41-41fb-a218-8cc1ac6146eb", "clusterID": "49", "connectionTime": "2022-01-31 13:17:15", "kWhDelivered": "74.00", "stationID": "421", "spaceID": "technologies-2", "timezone": "America/Los_Angeles", "userID": "482329" }
Configuration Amazon Redshift
Ces étapes vous montrent comment configurer la vue matérialisée afin d’ingérer des données.
-
Créez un schéma externe afin de mapper les données de Kinesis à un objet Redshift.
CREATE EXTERNAL SCHEMA evdata FROM KINESIS IAM_ROLE 'arn:aws:iam::0123456789:role/redshift-streaming-role';Pour plus d’informations sur la configuration du rôle IAM, consultez Mise en route de l’ingestion en streaming à partir d’Amazon Kinesis Data Streams.
Créez une vue matérialisée pour consommer les données du flux. L’exemple suivant montre comment définir une vue matérialisée pour ingérer les données au format JSON à partir d’un flux Kinesis.
Tout d’abord, stockez les registres de flux au format semi-structuré SUPER. Dans cet exemple, la source JSON est stockée dans Redshift sans conversion en types Redshift.
CREATE MATERIALIZED VIEW ev_station_data AS SELECT approximate_arrival_timestamp, partition_key, shard_id, sequence_number, case when can_json_parse(kinesis_data) then json_parse(kinesis_data) else null end as payload, case when not can_json_parse(kinesis_data) then kinesis_data else null end as failed_payload FROM evdata."ev_station_data" ;
Interrogation du flux
-
Activez les attributs SUPER sensibles à la casse à l’aide de la commande ci-dessous. Amazon Redshift n’est pas sensible à la casse par défaut. Par conséquent, pour accéder aux attributs SUPER sensibles à la casse, vous devez activer cette fonctionnalité.
SET enable_case_sensitive_super_attribute to TRUE; -
Actualisez la vue matérialisée à l’aide de la commande suivante afin d’extraire les données du flux.
REFRESH MATERIALIZED VIEW ev_station_data; -
Interrogez la vue matérialisée actualisée pour générer les statistiques d’utilisation.
SELECT e.payload.connectionTime::date as connectiontime ,SUM(e.payload.kWhDelivered::decimal(10,2)) AS Energy_Consumed ,count(distinct e.payload.userID) AS #Users from ev_station_data as e group by connectiontime order by 1 desc; Affichez les résultats.
connectiontime energy_consumed #users 2022-02-08 4139 10 2022-02-09 5571 10 2022-02-10 8697 20 2022-02-11 4408 10 2022-02-12 4257 10 2022-02-23 6861 10