Exemplo: Transformação de valores DateTime - Guia do desenvolvedor do Amazon Kinesis Data Analytics SQL para aplicativos

Para novos projetos, recomendamos que você use o novo Managed Service para Apache Flink Studio em vez do Kinesis Data Analytics for Applications. SQL O Managed Service for Apache Flink Studio combina facilidade de uso com recursos analíticos avançados, permitindo que você crie aplicativos sofisticados de processamento de stream em minutos.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Exemplo: Transformação de valores DateTime

O Amazon Kinesis Data Analytics suporta a conversão de colunas em time stamps. Por exemplo, use seu próprio time stamp como parte de uma cláusula GROUP BY como outra janela baseado em tempo, além da coluna ROWTIME. O Kinesis Data Analytics fornece operações e funções SQL para trabalhar com campos de data e hora.

  • Operadores de data e hora: você pode executar operações aritméticas em datas, horas e tipos de dados de intervalo. Para obter mais informações, consulte Operadores de data, time stamp e intervalo em Amazon Managed Service for Apache Flink SQL Reference.

     

  • Funções SQL: incluem o seguinte. Para obter mais informações, consulte Funções de data e hora em Amazon Managed Service for Apache Flink SQL Reference.

    • EXTRACT() - Extrai um campo a partir de uma data, hora, time stamp ou expressão de intervalo.

    • CURRENT_TIME – Retorna a hora em que a consulta é executada (UTC).

    • CURRENT_DATE – Retorna a data em que a consulta é executada (UTC).

    • CURRENT_TIMESTAMP – Retorna o time stamp em que a consulta é executada (UTC).

    • LOCALTIME – Retorna a hora atual em que a consulta é executada, conforme definido pelo ambiente no qual o Kinesis Data Analytics está sendo executado (UTC).

    • LOCALTIMESTAMP - Retorna o time stamp atual, conforme definido pelo ambiente no qual o Kinesis Data Analytics está sendo executado (UTC).

       

  • Extensões SQL: incluem o seguinte. Para obter mais informações, consulte Funções de data e hora e Funções de conversão de data e hora em Amazon Managed Service for Apache Flink SQL Reference.

    • CURRENT_ROW_TIMESTAMP - Retorna um novo time stamp para cada linha no fluxo.

    • TSDIFF - Retorna a diferença de dois time stamps em milissegundos.

    • CHAR_TO_DATE - Converte uma string em data.

    • CHAR_TO_TIME – Converte uma string em hora.

    • CHAR_TO_TIMESTAMP – Converte uma string em time stamp.

    • DATE_TO_CHAR – Converte uma data em string.

    • TIME_TO_CHAR – Converte uma hora em string.

    • TIMESTAMP_TO_CHAR – Converte um time stamp em uma string.

A maioria das funções SQL anteriores usam um formato para converter as colunas. O formato é flexível. Por exemplo, você pode especificar o formato yyyy-MM-dd hh:mm:ss para converter a string de entrada 2009-09-16 03:15:24 em time stamp. Para obter mais informações, consulte Char To Timestamp (Sys) em Amazon Managed Service for Apache Flink SQL Reference.

Exemplo: transformação de datas

Neste exemplo, você grava os registros a seguir em um fluxo de dados do Amazon Kinesis.

{"EVENT_TIME": "2018-05-09T12:50:41.337510", "TICKER": "AAPL"} {"EVENT_TIME": "2018-05-09T12:50:41.427227", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.520549", "TICKER": "INTC"} {"EVENT_TIME": "2018-05-09T12:50:41.610145", "TICKER": "MSFT"} {"EVENT_TIME": "2018-05-09T12:50:41.704395", "TICKER": "AAPL"} ...

Em seguida, você criará um aplicativo Kinesis Data Analytics no console com o stream Kinesis como origem de streaming. O processo de descoberta lê registros de exemplo na origem de streaming e infere um esquema de aplicativo com duas colunas (EVENT_TIME e TICKER), como mostrado.

Captura de tela do console mostrando o esquema no aplicativo com o tempo do evento e as colunas do marcador.

Em seguida, use o código do aplicativo com funções SQL para converter o campo de timestamp EVENT_TIME de várias maneiras. Insira dos dados resultantes em outro fluxo de aplicativo, como mostramos na captura de tela a seguir:

Captura de tela do console mostrando os dados resultantes em um stream no aplicativo.

Etapa 1: Criar um fluxo de dados Kinesis

Crie um Amazon Kinesis Data Streams e preencha com hora do evento e os registros do marcador, da seguinte maneira:

  1. Faça login AWS Management Console e abra o console do Kinesis em https://console.aws.amazon.com/kinesis.

  2. Selecione Data Streams (Fluxos de dados) no painel de navegação.

  3. Escolha Create Kinesis stream (Criar fluxo do Kinesis) e crie um fluxo com um estilhaço.

  4. Execute o seguinte código Python para preencher o fluxo com dados de amostra. Esse código simples grava continuamente um registro com um símbolo de marcador aleatório e o timestamp atual no fluxo.

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))

Etapa 2: Criar o aplicativo Amazon Kinesis Data Analytics

Crie um aplicativo da seguinte maneira:

  1. Abra o Managed Service for Apache Flink console em https://console.aws.amazon.com/kinesisanalytics.

  2. Escolha Create application (Criar aplicativo), digite um nome para o aplicativo e selecione Create application (Criar aplicativo).

  3. Na página de detalhes do aplicativo, escolha Connect streaming data (Conectar dados de streaming) para se conectar com a fonte.

  4. Na página Connect to source (Conectar com a fonte), faça o seguinte:

    1. Escolha o stream criado na seção anterior.

    2. Escolha para criar uma função do IAM.

    3. Escolha Discover schema (Descobrir esquema). Aguarde o console mostrar o esquema inferido e os registros de exemplo usados para inferir o esquema do fluxo do aplicativo criado. O esquema inferido tem duas colunas.

    4. Escolha Edit Schema (Editar esquema). Mude Column type (Tipo de coluna) da coluna EVENT_TIME para TIMESTAMP.

    5. Escolha Save schema and update stream samples. Depois que o console salvar o esquema, escolha Exit (Sair).

    6. Escolha Save and continue.

  5. Na página de detalhes de aplicativo, escolha Go to SQL editor (Ir para o editor de SQL). Para iniciar o aplicativo, escolha Yes, start application (Sim, iniciar o aplicativo) na caixa de diálogo exibida.

  6. No editor SQL, escreva o código do aplicativo e verifique os resultados da seguinte forma:

    1. Copie o código de aplicativo a seguir e cole-o no editor.

      CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" ( TICKER VARCHAR(4), event_time TIMESTAMP, five_minutes_before TIMESTAMP, event_unix_timestamp BIGINT, event_timestamp_as_char VARCHAR(50), event_second INTEGER); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM TICKER, EVENT_TIME, EVENT_TIME - INTERVAL '5' MINUTE, UNIX_TIMESTAMP(EVENT_TIME), TIMESTAMP_TO_CHAR('yyyy-MM-dd hh:mm:ss', EVENT_TIME), EXTRACT(SECOND FROM EVENT_TIME) FROM "SOURCE_SQL_STREAM_001"
    2. Escolha Save and run SQL. Na guia Real-time analytics (Análise em tempo real), você pode ver todos os fluxos de aplicativo criados pelo aplicativo e verificar os dados.