Gravar no Amazon Kinesis Data Streams usando o Kinesis Agent - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Gravar no Amazon Kinesis Data Streams usando o Kinesis Agent

O Kinesis Agent é uma aplicação de software Java independente que oferece uma maneira fácil de coletar e enviar dados ao Kinesis Data Streams. O agente monitora continuamente um conjunto de arquivos e envia novos dados ao stream. Ele manipula o rodízio de arquivos, os pontos de verificação e as novas tentativas após falhas. Seus dados são entregues de maneira confiável, imediata e simples. Ele também emite CloudWatch métricas da Amazon para ajudar você a monitorar e solucionar melhor o processo de streaming.

Por padrão, os registros são analisados em cada arquivo com base no caractere de nova linha ('\n'). No entanto, o agente também pode ser configurado para analisar registros de várias linhas (consulte Configurações do agente).

Você pode instalar o agente em ambientes de servidor baseados no Linux, como servidores web, servidores de log e servidores de banco de dados. Após instalar o agente, configure-o especificando os arquivos a serem monitorados e o stream dos dados. Depois que o agente é configurado, ele coleta dados dos arquivos de forma durável e os envia confiavelmente ao stream.

Pré-requisitos

Download e instalação do agente

Primeiro, conecte-se à instância. Para obter mais informações, consulte Conectar à sua instância no Manual do usuário do Amazon EC2 para instâncias do Linux. Se tiver problemas para se conectar, consulte Solução de problemas para conectar-se à sua instância no Guia do usuário do Amazon EC2 para instâncias do Linux.

Para configurar o agente usando o Amazon Linux AMI

Use o comando a seguir para fazer download do agente e instalá-lo:

sudo yum install –y aws-kinesis-agent
Para configurar o agente usando o Red Hat Enterprise Linux

Use o comando a seguir para fazer download do agente e instalá-lo:

sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
Para configurar o agente usando GitHub
  1. Baixe o agente em amazon-kinesis-agentawlabs/.

  2. Instale o agente navegando até o diretório de download e executando o comando a seguir:

    sudo ./setup --install
Para configurar o agente em um contêiner do Docker

O Kinesis Agent também pode ser executado em um contêiner por meio da base de contêineres amazonlinux. Use o Dockerfile a seguir e depois execute o docker build.

FROM amazonlinux RUN yum install -y aws-kinesis-agent which findutils COPY agent.json /etc/aws-kinesis/agent.json CMD ["start-aws-kinesis-agent"]

Configuração e inicialização do agente

Para configurar e iniciar o agente
  1. Abra e edite o arquivo de configuração (como superusuário, se você estiver usando permissões padrão de acesso a arquivos): /etc/aws-kinesis/agent.json

    Nesse arquivo de configuração, especifique os arquivos ( "filePattern" ) nos quais o agente coleta dados e o nome do stream ( "kinesisStream" ) ao qual o agente envia dados. Observe que o nome do arquivo é um padrão, e o agente reconhece os rodízios de arquivos. Você só pode fazer o rodízio de arquivos ou criar novos arquivos uma vez por segundo, no máximo. O agente usa o carimbo de data e hora de criação de arquivo para determinar quais arquivos serão rastreados e colocados no final do stream; a criação de novos arquivos ou o rodízio de arquivos em uma frequência superior a uma vez por segundo não permite que o agente faça a distinção entre eles corretamente.

    { "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "yourkinesisstream" } ] }
  2. Inicie o agente manualmente:

    sudo service aws-kinesis-agent start
  3. (Opcional) Configure o agente para ser iniciado durante o startup do sistema:

    sudo chkconfig aws-kinesis-agent on

Agora o agente está sendo executado como um serviço do sistema em segundo plano. Ele monitora continuamente os arquivos especificados e envia dados ao stream especificado. A atividade do agent é registrada em /var/log/aws-kinesis-agent/aws-kinesis-agent.log.

Configurações do agente

O agente oferece suporte a duas configurações obrigatórias, filePattern e kinesisStream, além das configurações opcionais de recursos adicionais. É possível especificar configurações obrigatórias e opcionais em /etc/aws-kinesis/agent.json.

Sempre que você alterar o arquivo de configuração, deverá interromper e iniciar o agente, usando os seguintes comandos:

sudo service aws-kinesis-agent stop sudo service aws-kinesis-agent start

Se desejar, você pode usar o seguinte comando:

sudo service aws-kinesis-agent restart

Estas são as configurações gerais.

Definição da configuração Descrição
assumeRoleARN

O ARN da função a ser assumida pelo usuário. Para obter mais informações, consulte Delegar acesso entre AWS contas usando funções do IAM no Guia do usuário do IAM.

assumeRoleExternalId

Um identificador opcional que determina quem pode assumir a função. Para obter mais informações, consulte Como usar um ID externo no Guia do usuário do IAM.

awsAccessKeyId

AWS ID da chave de acesso que substitui as credenciais padrão. Essa configuração tem precedência sobre todos os outros provedores de credenciais.

awsSecretAccessKey

AWS chave secreta que substitui as credenciais padrão. Essa configuração tem precedência sobre todos os outros provedores de credenciais.

cloudwatch.emitMetrics

Permite que o agente emita métricas para, CloudWatch se definidas (verdadeiras).

Padrão: True

cloudwatch.endpoint

O endpoint regional para CloudWatch.

Padrão: monitoring.us-east-1.amazonaws.com

kinesis.endpoint

O endpoint regional do Kinesis Data Streams.

Padrão: kinesis.us-east-1.amazonaws.com

Estas são as configurações de fluxo.

Definição da configuração Descrição
dataProcessingOptions

A lista das opções de processamento aplicadas a cada registro analisado antes que ele seja enviado ao stream. As opções de processamento são executadas na ordem especificada. Para ter mais informações, consulte Uso do agente para pré-processar dados.

kinesisStream

[Obrigatório] O nome do stream.

filePattern

[Obrigatório] O diretório e o padrão de arquivo que devem ser combinados para serem coletados pelo agente. Para todos os arquivos correspondentes a esse padrão, deve ser concedida uma permissão de leitura a aws-kinesis-agent-user. Para o diretório que contém os arquivos, devem ser concedidas permissões de leitura e execução a aws-kinesis-agent-user.

initialPosition

A posição em que o arquivo começou a ser analisado. Os valores válidos são START_OF_FILE e END_OF_FILE.

Padrão: END_OF_FILE

maxBufferAgeMillis

O tempo máximo, em milissegundos, durante o qual o agente armazena os dados em buffer antes de enviá-los ao stream.

Intervalo de valores: 1.000 a 900.000 (1 segundo a 15 minutos)

Padrão: 60.000 (1 minuto)

maxBufferSizeBytes

O tamanho máximo, em bytes, durante o qual o agente armazena os dados em buffer antes de enviá-los ao stream.

Intervalo de valores: 1 a 4.194.304 (4 MB)

Padrão: 4.194.304 (4 MB)

maxBufferSizeRecords

O número máximo de registros para os quais o agente armazena os dados em buffer antes de enviá-los ao stream.

Intervalo de valores: 1 a 500

Padrão: 500

minTimeBetweenFilePollsMillis

O intervalo de tempo, em milissegundos, em que o agente consulta e analisa os arquivos monitorados em busca de novos dados.

Intervalo de valores: 1 ou mais

Padrão: 100

multiLineStartPattern

O padrão de identificação do início de um registro. Um registro é composto por uma linha que corresponde ao padrão e pelas linhas subsequentes que não correspondem ao padrão. Os valores válidos são expressões regulares. Por padrão, cada nova linha nos arquivos de log é analisada como um único registro.

partitionKeyOption

O método para gerar a chave de partição. Os valores válidos são RANDOM (inteiro gerado aleatoriamente) e DETERMINISTIC (um valor de hash calculado a partir dos dados).

Padrão: RANDOM

skipHeaderLines

O número de linhas em que o agente ignorará a análise no início dos arquivos monitorados.

Intervalo de valores: 0 ou mais

Padrão: 0 (zero)

truncatedRecordTerminator

A string que o agente usa para truncar um registro analisado que excede o limite de tamanho de registro do Kinesis Data Streams. (1,000 KB)

Padrão: '\n' (nova linha)

Monitoramento de vários diretórios de arquivos e gravação em vários streams

Ao especificar vários fluxos de configurações, você pode configurar o agente para monitorar vários diretórios de arquivos e enviar dados a vários streams. No exemplo de configuração a seguir, o agente monitora dois diretórios de arquivos e envia dados para um stream do Kinesis e um stream de entrega do Firehose, respectivamente. Observe que você pode especificar endpoints diferentes para o Kinesis Data Streams e o Firehose para que o stream do Kinesis e o stream de entrega do Firehose não precisem estar na mesma região.

{ "cloudwatch.emitMetrics": true, "kinesis.endpoint": "https://your/kinesis/endpoint", "firehose.endpoint": "https://your/firehose/endpoint", "flows": [ { "filePattern": "/tmp/app1.log*", "kinesisStream": "yourkinesisstream" }, { "filePattern": "/tmp/app2.log*", "deliveryStream": "yourfirehosedeliverystream" } ] }

Para obter informações mais detalhadas sobre o uso do agente com o Firehose, consulte Gravando no Amazon Data Firehose com o Kinesis Agent.

Uso do agente para pré-processar dados

O agente pode pré-processar os registros analisados a partir dos arquivos monitorados antes de enviá-los ao stream. Você pode habilitar esse recurso adicionando a configuração dataProcessingOptions ao fluxo de arquivos. Um ou mais opções de processamento podem ser adicionadas e serão executadas na ordem especificada.

O agente oferece suporte às seguintes opções de processamento. Como o agente é de código aberto, você pode desenvolver e estender ainda mais suas opções de processamento. Você pode baixar o agente em Kinesis Agent.

Opções de processamento
SINGLELINE

Converte um registro de várias linhas em um registro de única linha removendo caracteres de nova linha, e espaços à esquerda e à direita.

{ "optionName": "SINGLELINE" }
CSVTOJSON

Converte um registro com formato separado por delimitador em um registro com formato JSON.

{ "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", ... ], "delimiter": "yourdelimiter" }
customFieldNames

[Obrigatório] Os nomes de campo usados como chaves em cada par de valores de chave JSON. Por exemplo, se você especificar ["f1", "f2"], o registro "v1, v2" será convertido em {"f1":"v1","f2":"v2"}.

delimiter

A string usada como delimitador no registro. O padrão é uma vírgula (,).

LOGTOJSON

Converte um registro com formato de log em um registro com formato JSON. Os formatos de log compatíveis são Apache Common Log, Apache Combined Log, Apache Error Log e RFC3164 Syslog.

{ "optionName": "LOGTOJSON", "logFormat": "logformat", "matchPattern": "yourregexpattern", "customFieldNames": [ "field1", "field2", ] }
logFormat

[Obrigatório] O formato da entrada de log. Os valores possíveis são:

  • COMMONAPACHELOG: o formato do Apache Common Log. Cada entrada de log tem o seguinte padrão: "%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}".

  • COMBINEDAPACHELOG: o formato do Apache Combined Log. Cada entrada de log tem o seguinte padrão: "%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}".

  • APACHEERRORLOG: o formato do Apache Error Log. Cada entrada de log tem o seguinte padrão: "[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}".

  • SYSLOG: o formato do RFC3164 Syslog. Cada entrada de log tem o seguinte padrão: "%{timestamp} %{hostname} %{program}[%{processid}]: %{message}".

matchPattern

O padrão da expressão regular usada para extrair valores de entradas de log. Essa configuração é usada se a entrada de log não estiver em um dos formatos de log predefinidos. Se essa configuração for usado, você também terá que especificar customFieldNames.

customFieldNames

Os nomes de campo personalizados usados como chaves em cada par de valores de chave JSON. Você pode usar essa configuração para definir nomes de campo para valores extraídos de matchPattern ou substituir os nomes de campo padrão de formatos de log predefinidos.

exemplo : Configuração LOGTOJSON

Aqui está um exemplo de uma configuração LOGTOJSON para uma entrada Apache Common Log convertida em formato JSON:

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" }

Antes da conversão:

64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291

Depois da conversão:

{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
exemplo : Configuração LOGTOJSON com campos personalizados

Aqui está outro exemplo de configuração LOGTOJSON:

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"] }

Com essa configuração, a mesma entrada Apache Common Log do exemplo anterior é convertida em formato JSON, da seguinte forma:

{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
exemplo : Conversão da entrada Apache Common Log

A configuração de fluxo a seguir converte uma entrada Apache Common Log em um registro de linha única no formato JSON:

{ "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "my-stream", "dataProcessingOptions": [ { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" } ] } ] }
exemplo : Conversão de registros de várias linhas

A configuração de fluxo a seguir analisa registros de várias linha cuja primeira linha começa com "[SEQUENCE=". Cada registro é convertido primeiro em um registro de única linha. Em seguida, os valores são extraídos do registro com base em um delimitador por tabulações. Os valores extraídos são mapeados para os valores customFieldNames especificados, a fim de formar um registro de linha única no formato JSON.

{ "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "my-stream", "multiLineStartPattern": "\\[SEQUENCE=", "dataProcessingOptions": [ { "optionName": "SINGLELINE" }, { "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", "field3" ], "delimiter": "\\t" } ] } ] }
exemplo : Configuração LOGTOJSON com padrão de correspondência

Aqui está um exemplo de configuração LOGTOJSON referente a uma entrada Apache Common Log convertida em formato JSON, com o último campo (bytes) omitido:

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})", "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"] }

Antes da conversão:

123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200

Depois da conversão:

{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}

Comandos da CLI do agente

Inicie automaticamente o agente durante o startup do sistema:

sudo chkconfig aws-kinesis-agent on

Verifique o status do agente:

sudo service aws-kinesis-agent status

Interrompa o agente:

sudo service aws-kinesis-agent stop

Leia o arquivo de log do agente a partir deste local:

/var/log/aws-kinesis-agent/aws-kinesis-agent.log

Desinstale o agente:

sudo yum remove aws-kinesis-agent

Perguntas frequentes

Existe um Kinesis Agent para Windows?

O Kinesis Agent para Windows é um software diferente das plataformas do Kinesis Agent para Linux.

Por que o Kinesis Agent está ficando mais lento e/ou aumentando os RecordSendErrors?

Isso geralmente ocorre devido ao controle de utilização do Kinesis. Verifique a WriteProvisionedThroughputExceeded métrica do Kinesis Data Streams ThrottledRecords ou a métrica do Firehose Delivery Streams. Qualquer aumento de 0 nessas métricas indica que os limites do fluxo precisam ser aumentados. Para obter mais informações, consulte Kinesis Data Stream limits e Amazon Firehose Delivery Streams.

Depois de descartar o controle de utilização como causa, verifique se o Kinesis Agent está configurado para seguir um número grande de arquivos pequenos. Há um atraso quando o Kinesis Agent exibe os dados do final de um arquivo novo, portanto, o Kinesis Agent deveria estar exibindo os dados do final de um pequeno número de arquivos maiores. Tente consolidar os arquivos de log em arquivos maiores.

Por que estou recebendo exceções java.lang.OutOfMemoryError ?

O Kinesis Agent não tem memória suficiente para lidar com a workload atual. Tente aumentar JAVA_START_HEAP e JAVA_MAX_HEAP no /usr/bin/start-aws-kinesis-agent e reiniciar o agente.

Por que estou recebendo exceções IllegalStateException : connection pool shut down?

O Kinesis Agent não tem conexões suficientes para lidar com a workload atual. Tente aumentar maxConnections e maxSendingThreads nas configurações gerais do agente em /etc/aws-kinesis/agent.json. O valor padrão para esses campos é 12 vezes o número de processadores de runtime disponíveis. Consulte AgentConfiguration.java para saber mais sobre as configurações avançadas do agente.

Como posso depurar outro problema com o Kinesis Agent?

Os logs do nível DEBUG podem ser habilitados em /etc/aws-kinesis/log4j.xml.

Como devo configurar o Kinesis Agent?

Quanto menor o maxBufferSizeBytes, mais frequentemente o Kinesis Agent enviará dados. Isso pode ser bom, pois diminui o tempo de entrega dos registros, mas também aumenta as solicitações por segundo feitas ao Kinesis.

Por que o Kinesis Agent está enviando registros duplicados?

Isso ocorre devido a uma configuração incorreta da exibição dos dados do final dos arquivos. Certifique-se de que cada fileFlow’s filePattern corresponda a apenas um arquivo. Isso também pode ocorrer se o modo logrotate que está sendo usado estiver no modo copytruncate. Tente mudar o modo para o modo padrão ou criar para evitar duplicações. Para obter mais informações sobre como lidar com registros duplicados, consulte Handling Duplicate Records.