Conexões do Kinesis - AWS Glue

Conexões do Kinesis

É possível usar uma conexão do Kinesis para ler e gravar o Amazon Kinesis Data Streams usando informações armazenadas em uma tabela do Data Catalog ou fornecendo informações para acessar diretamente o fluxo de dados. Você pode ler informações do Kinesis em um Spark DataFrame e depois convertê-las em um Glue DynamicFrame AWS. Você pode gravar DynamicFrames no Kinesis em um formato JSON. Se você acessar diretamente o fluxo de dados, use essas opções para fornecer as informações sobre como acessar o fluxo de dados.

Se você usar getCatalogSource ou create_data_frame_from_catalog para consumir registros de uma fonte de transmissão do Kinesis, o trabalho tem o banco de dados do catálogo de dados e as informações de nome da tabela, e pode usá-los para obter alguns parâmetros básicos para leitura da fonte de transmissão do Kinesis. Se você usar getSource, getSourceWithFormat, createDataFrameFromOptions ou create_data_frame_from_options, será necessário especificar esses parâmetros básicos usando as opções de conexão descritas aqui.

Você pode especificar as opções de conexão para o Kinesis usando os seguintes argumentos para os métodos especificados na classe GlueContext.

  • Scala

    • connectionOptions: usar com getSource, createDataFrameFromOptions, getSink

    • additionalOptions: usar com getCatalogSource, getCatalogSink

    • options: usar com getSourceWithFormat, getSinkWithFormat

  • Python

    • connection_options: usar com create_data_frame_from_options, write_dynamic_frame_from_options

    • additional_options: usar com create_data_frame_from_catalog, write_dynamic_frame_from_catalog

    • options: usar com getSource, getSink

Para notas e restrições sobre trabalhos de ETL de streaming, consulte Notas e restrições sobre ETL de transmissão.

Configurar o Kinesis

Para ler um fluxo de dados do Kinesis em uma trabalho do AWS Glue Spark, você precisará de alguns pré-requisitos:

  • Se for leitura, o trabalho do AWS Glue deve ter permissões IAM do nível de acesso de leitura ao fluxo de dados do Kinesis.

  • Se for gravação, o trabalho do AWS Glue deve ter permissões IAM do nível de acesso de gravação ao fluxo de dados do Kinesis.

Em certos casos, você precisará configurar pré-requisitos adicionais:

  • Se o trabalho do AWS Glue estiver configurado com conexões de rede adicionais (normalmente para se conectar a outros conjuntos de dados) e uma dessas conexões fornecer opções de rede da Amazon VPC, isso direcionará o trabalho para se comunicar pela Amazon VPC. Nesse caso, você também precisará configurar o fluxo de dados do Kinesis para se comunicar pela Amazon VPC. É possível fazer isso criando um endpoint da VPC de interface entre a Amazon VPC e o fluxo de dados do Kinesis. Para obter mais informações, consulte Using Kinesis Data Streams with Interface VPC Endpoints.

  • Ao especificar Amazon Kinesis Data Streams em outra conta, você deve configurar os perfis e políticas para permitir o acesso entre contas. Para obter mais informações, consulte Exemplo: Ler de uma transmissão do Kinesis em outra conta.

Para obter mais informações sobre pré-requisitos de trabalho de ETL de streaming, consulte Trabalhos de transmissão de ETL no AWS Glue.

Exemplo: ler de fluxos do Kinesis

Exemplo: ler de fluxos do Kinesis

Usado em conjunto com forEachBatch.

Exemplo para fonte de transmissão do Amazon Kinesis:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Exemplo: gravação em streams do Kinesis

Exemplo: ler de fluxos do Kinesis

Usado em conjunto com forEachBatch.

Exemplo para fonte de transmissão do Amazon Kinesis:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Referência de opções de conexão do Kinesis

Designa opções de conexão para o Amazon Kinesis Data Streams.

Use as seguintes opções de conexão para fontes de dados de transmissão do Kinesis:

  • "streamARN" (obrigatório) usado para leitura/gravação. O ARN do fluxo de dados do Kinesis.

  • "classification" (Obrigatório para leitura) Usado para leitura. O formato de arquivo usado pelos dados no registro. Obrigatório, a menos que fornecido por meio do catálogo de dados.

  • "streamName" (opcional) usado para leitura. O nome de um fluxo de dados do Kinesis de onde ler. Usado com endpointUrl.

  • "endpointUrl" (opcional) usado para leitura. Padrão: “https://kinesis.us-east-1.amazonaws.com” O endpoint AWS do stream do Kinesis. Você não precisa alterar isso, a menos que esteja se conectando a uma região especial.

  • "partitionKey" (opcional) usado para gravação. A chave de partição do Kinesis usada na produção de registros.

  • "delimiter" (opcional) usado para leitura. O separador de valores usado quando a classification é CSV. O padrão é ",".

  • "startingPosition": (opcional) usado para leitura. A posição inicial no fluxo de dados do Kinesis de onde ler os dados. Os valores possíveis são "latest", "trim_horizon", "earliest" ou uma string de timestamp no formato UTC no padrão yyyy-mm-ddTHH:MM:SSZ (onde Z representa um desvio do fuso horário UTC com +/-). Por exemplo: "2023-04-04T08:00:00-04:00"). O valor padrão é "latest". Observação: a string de timestamp no formato UTC para "startingPosition" é compatível somente com a versão 4.0 ou posterior do AWS Glue.

  • "failOnDataLoss": (Opcional) Falha na tarefa se algum fragmento ativo estiver ausente ou expirado. O valor padrão é "false".

  • "awsSTSRoleARN": (opcional) usado para leitura/gravação. O nome de recurso da Amazon (ARN) da função a ser assumida com o uso do AWS Security Token Service (AWS STS). Essa função deve ter permissões para descrever ou ler operações de registro para o fluxo de dados do Kinesis. Você deve usar esse parâmetro ao acessar um fluxo de dados em uma conta diferente. Usado em conjunto com "awsSTSSessionName".

  • "awsSTSSessionName": (opcional) usado para leitura/gravação. Um identificador para a sessão que assume a função usando o AWS STS. Você deve usar esse parâmetro ao acessar um fluxo de dados em uma conta diferente. Usado em conjunto com "awsSTSRoleARN".

  • "awsSTSEndpoint": (Opcional) O AWS STS endpoint a ser usado ao se conectar ao Kinesis com uma função assumida. Isso permite usar o AWS STS endpoint regional em uma VPC, o que não é possível com o endpoint global padrão.

  • "maxFetchTimeInMs": (opcional) usado para leitura. O tempo máximo para o executor do trabalho ler registros referentes ao lote atual do fluxo de dados do Kinesis especificado em milissegundos (ms). Várias chamadas de API GetRecords podem ser feitas nesse período. O valor padrão é 1000.

  • "maxFetchRecordsPerShard": (opcional) usado para leitura. O número máximo de registros a serem obtidos por fragmento no fluxo de dados do Kinesis por microlote. Observação: o cliente poderá exceder esse limite se o trabalho de streaming já tiver lido registros extras do Kinesis (na mesma chamada get-records). Se maxFetchRecordsPerShard precisa ser rigoroso, então precisa ser um múltiplo de maxRecordPerRead. O valor padrão é 100000.

  • "maxRecordPerRead": (opcional) usado para leitura. O número máximo de registros a serem obtidos por fragmento no fluxo de dados do Kinesis em cada operação getRecords. O valor padrão é 10000.

  • "addIdleTimeBetweenReads": (opcional) usado para leitura. Adiciona um atraso de tempo entre duas operações getRecords. O valor padrão é "False". Essa opção só pode ser configurada para o Glue versão 2.0 e posterior.

  • "idleTimeBetweenReadsInMs": (opcional) usado para leitura. O atraso mínimo entre duas operações , especificado em ms. O valor padrão é 1000. Essa opção só pode ser configurada para o Glue versão 2.0 e posterior.

  • "describeShardInterval": (opcional) usado para leitura. O intervalo de tempo mínimo entre duas chamadas de API ListShards para que seu script considere a refragmentação. Para obter mais informações, consulte Estratégias para refragmentação no Guia do desenvolvedor do Amazon Kinesis Data Streams. O valor padrão é 1s.

  • "numRetries": (opcional) usado para leitura. O número máximo de novas tentativas para solicitações de API do Kinesis Data Streams. O valor padrão é 3.

  • "retryIntervalMs": (opcional) usado para leitura. O período de espera (especificado em ms) antes de repetir a chamada da API Kinesis Data Streams. O valor padrão é 1000.

  • "maxRetryIntervalMs": (opcional) usado para leitura. O período de espera máximo (especificado em ms) entre duas tentativas de uma chamada de API Kinesis Data Streams. O valor padrão é 10000.

  • "avoidEmptyBatches": (opcional) usado para leitura. Evita a criação de um trabalho de micro lote vazio verificando se há dados não lidos no fluxo de dados do Kinesis antes de o lote ser iniciado. O valor padrão é "False".

  • "schema": (Obrigatório quando inferSchema é definido como false): usado para leitura. O esquema a ser usado para processar a carga. Se a classificação for avro, o esquema fornecido deverá estar no formato de esquema Avro. Se a classificação não for avro, o esquema fornecido deverá estar no formato de esquema DDL.

    Veja a seguir alguns exemplos de esquema.

    Example in DDL schema format
    `column1` INT, `column2` STRING , `column3` FLOAT
    Example in Avro schema format
    { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
  • "inferSchema": (opcional) usado para leitura. O valor padrão é "false". Se definido como “true”, o esquema será detectado em runtime com base na carga útil em foreachbatch.

  • "avroSchema": (Obsoleto) Usado para leitura. Parâmetro usado para especificar um esquema de dados Avro quando o formato Avro é usado. Esse parâmetro foi descontinuado. Use o parâmetro schema.

  • "addRecordTimestamp": (opcional) usado para leitura. Quando essa opção for definida como "true", a saída de dados conterá uma coluna adicional denominada "__src_timestamp" que indica a hora que o registro correspondente é recebido pelo fluxo. O valor padrão é "false". Essa opção é compatível com o AWS Glue versão 4.0 ou posterior.

  • "emitConsumerLagMetrics": (opcional) usado para leitura. Quando a opção for definida como "true" (verdadeira), para cada lote, serão emitidas métricas durante o período entre a hora que o registro mais antigo é recebido pelo tópico e a hora que ele chega ao AWS Glue para o CloudWatch. O nome da métrica é "glue.driver.streaming.maxConsumerLagInMs". O valor padrão é "false". Essa opção é compatível com o AWS Glue versão 4.0 ou posterior.

  • "fanoutConsumerARN": (opcional) usado para leitura. O ARN de um consumidor de fluxo do Kinesis para o fluxo especificado em streamARN. Usado para habilitar o modo de distribuição avançada para a conexão do Kinesis. Para obter mais informações sobre como consumir um fluxo do Kinesis com distribuição avançada, consulte Usar distribuição avançada nas tarefas de streaming do Kinesis.

  • "recordMaxBufferedTime" (opcional) usado para gravação. Padrão: 1000 (ms). Tempo máximo em que um registro é armazenado em buffer enquanto espera para ser gravado.

  • "aggregationEnabled" (opcional) usado para gravação. Padrão: true. Especifica se os registros devem ser agregados antes de serem enviados para o Kinesis.

  • "aggregationMaxSize" (opcional) usado para gravação. Padrão: 51200 (bytes) Se um registro for maior que esse limite, ele ignorará o agregador. Nota: O Kinesis impõe um limite de 50 KB no tamanho do registro. Se você definir isso além de 50 KB, registros grandes serão rejeitados pelo Kinesis.

  • "aggregationMaxCount" (opcional) usado para gravação. Padrão: 4294967295. O número máximo de itens a serem retornados em um registro agregado.

  • "producerRateLimit" (opcional) usado para gravação. Padrão: 150 (%). Limita o throughput por fragmento enviado por um único produtor (como seu trabalho), como uma porcentagem do limite de back-end.

  • "collectionMaxCount" (opcional) usado para gravação. Padrão: 500. Número máximo de itens a serem compactados em uma solicitação PutRecords.

  • "collectionMaxSize" (opcional) usado para gravação. Padrão: 5242880 (bytes) Quantidade máxima de dados a serem enviados com uma solicitação PutRecords.