Scrittura in Flusso di dati Amazon Kinesis tramite un agente Kinesis - Flusso di dati Amazon Kinesis

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Scrittura in Flusso di dati Amazon Kinesis tramite un agente Kinesis

L'agente Kinesis è un'applicazione software Java autonoma che offre un modo semplice per raccogliere e inviare dati al flusso di dati Kinesis. L'agente controlla costantemente un set di file e invia i nuovi dati al flusso. L'agente gestisce la rotazione dei file, i checkpoint e i tentativi in caso di errori. Fornisce tutti i dati in un modo affidabile, tempestivo e semplice. Inoltre, emette i CloudWatch parametri di Amazon per aiutarti a monitorare e risolvere meglio il processo di streaming.

Come impostazione predefinita, i record vengono analizzati da ciascun file in base alla nuova riga di caratteri ('\n'). Tuttavia, l'agente può anche essere configurato per analizzare record a più righe (consulta Impostazioni configurazione agente).

Puoi installare l'agente su ambienti server basati su Linux, come server Web, server di log e server di database. Dopo aver installato l'agente, configurarlo specificando i file da monitorare e il flusso per i dati. Dopo che l'agente è configurato, raccoglie i dati dal file in modo durevole e li invia al flusso in modo affidabile.

Prerequisiti

Scarica e installa l'agente

Innanzitutto connettiti all'istanza, Per ulteriori informazioni, consulta Connect to Your Instance nella Amazon EC2 User Guide. In caso di problemi di connessione, consulta Risoluzione dei problemi di connessione alla tua istanza nella Guida per l'utente di Amazon EC2.

Per configurare l'agente utilizzando l'AMI Amazon Linux

Utilizzare il seguente comando per scaricare e installare l'agente:

sudo yum install –y aws-kinesis-agent
Per configurare l'agente utilizzando Red Hat Enterprise Linux

Utilizzare il seguente comando per scaricare e installare l'agente:

sudo yum install –y https://s3.amazonaws.com/streaming-data-agent/aws-kinesis-agent-latest.amzn2.noarch.rpm
Per configurare l'agente utilizzando GitHub
  1. Scarica l'agente da awlabs/ amazon-kinesis-agent.

  2. Installare l'agente spostandosi nella directory di download ed eseguendo il comando seguente:

    sudo ./setup --install
Configurazione dell'agente in un container Docker

L'agente Kinesis può essere eseguito anche in un container tramite la base container amazonlinux. Utilizza il seguente Dockerfile e poi esegui docker build.

FROM amazonlinux RUN yum install -y aws-kinesis-agent which findutils COPY agent.json /etc/aws-kinesis/agent.json CMD ["start-aws-kinesis-agent"]

Configurazione e avvio dell'agente

Configurazione e avvio dell'agente
  1. Aprire e modificare il file di configurazione (come superutente se vengono utilizzate le autorizzazioni predefinite di accesso al file): /etc/aws-kinesis/agent.json

    In questo file di configurazione, specificare i file ( "filePattern" ) dai quali l'agente raccoglie i dati e il nome del flusso ( "kinesisStream" ) al quale l'agente invia i dati. Si noti che il nome del file è un modello e l'agente riconosce le rotazioni dei file. Puoi ruotare i file o creare nuovi file non più di una volta al secondo. L'agente usa il timestamp di creazione di file per stabilire quale file tracciare e inserire nel flusso; la creazione di nuovi file o la rotazione di file più frequentemente di una volta al secondo non consente all'agente di distinguerli in modo corretto.

    { "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "yourkinesisstream" } ] }
  2. Avvia l'agente manualmente:

    sudo service aws-kinesis-agent start
  3. (Facoltativo) Configurare l'agente per iniziare l'avvio del sistema:

    sudo chkconfig aws-kinesis-agent on

L'agente è ora in esecuzione come servizio di sistema in background. Controlla costantemente i file specificati e invia i dati al flusso specificato. L'attività dell'agente viene registrata in /var/log/aws-kinesis-agent/aws-kinesis-agent.log.

Impostazioni configurazione agente

L'agente supporta le due impostazioni di configurazione obbligatorie filePattern e kinesisStream, più impostazioni di configurazione opzionali per funzionalità aggiuntive. Puoi specificare la configurazione obbligatoria e opzionale in /etc/aws-kinesis/agent.json.

Quando modifichi il file di configurazione, devi arrestare e avviare l'agente, utilizzando i comandi seguenti:

sudo service aws-kinesis-agent stop sudo service aws-kinesis-agent start

In alternativa, potresti utilizzare il comando seguente:

sudo service aws-kinesis-agent restart

Seguono le impostazioni di configurazione generali.

Impostazione di configurazione Descrizione
assumeRoleARN

L'ARN del ruolo da assegnare all'utente. Per ulteriori informazioni, consulta Delegare l'accesso tra AWS account utilizzando ruoli IAM nella Guida per l'utente IAM.

assumeRoleExternalId

Si è verificato un identificatore opzionale che determina chi può assumere il ruolo. Per ulteriori informazioni, consulta Come utilizzare un ID esterno nella Guida per l’utente di IAM.

awsAccessKeyId

AWS ID della chiave di accesso che sostituisce le credenziali predefinite. Questa impostazione ha la precedenza su tutti gli altri provider di credenziali.

awsSecretAccessKey

AWS chiave segreta che sostituisce le credenziali predefinite. Questa impostazione ha la precedenza su tutti gli altri provider di credenziali.

cloudwatch.emitMetrics

Consente all'agente di emettere metriche su CloudWatch if set (true).

Impostazione predefinita: true

cloudwatch.endpoint

L'endpoint regionale per. CloudWatch

Impostazione predefinita: monitoring.us-east-1.amazonaws.com

kinesis.endpoint

L'endpoint regionale per il flusso di dati Kinesis.

Impostazione predefinita: kinesis.us-east-1.amazonaws.com

Seguono le impostazioni di configurazione del flusso.

Impostazione di configurazione Descrizione
dataProcessingOptions

L'elenco delle opzioni di elaborazione applicate a ciascun record analizzato prima dell'invio al flusso. Le opzioni di elaborazione vengono eseguite nell'ordine specificato. Per ulteriori informazioni, consulta Utilizza l'agente per preelaborare i dati.

kinesisStream

[Obbligatorio] Il nome del flusso.

filePattern

[Obbligatorio] La directory e il modello di file che devono corrispondere per essere rilevati dall'agente. Per tutti i file che corrispondono a questo modello, le autorizzazioni di lettura devono essere concesse a aws-kinesis-agent-user. Per la directory contenente i file, le autorizzazioni di lettura ed esecuzione devono essere concesse aws-kinesis-agent-user.

initialPosition

La posizione iniziale dalla quale è iniziata l'analisi del file. I valori validi sono START_OF_FILE e END_OF_FILE.

Impostazione predefinita: END_OF_FILE

maxBufferAgeMillis

Il tempo massimo, in millisecondi, in cui l'agente effettua il buffer dati prima di inviarli al flusso.

Intervallo valore: da 1.000 a 900.000 (da 1 secondo a 15 minuti)

Impostazione predefinita: 60.000 (1 minuto)

maxBufferSizeBytes

Le dimensioni massime, in byte, in cui l'agente effettua il buffer dati prima di inviarli al flusso.

Intervallo valore: da 1 a 4.194.304 (4 MB)

Impostazione predefinita: 4.194.304 (4 MB)

maxBufferSizeRecords

Il numero massimo di record in cui l'agente effettua il buffer dati prima di inviarli al flusso.

Intervallo valore: da 1 a 500

Impostazione predefinita: 500

minTimeBetweenFilePollsMillis

L'intervallo di tempo, in millisecondi, in cui l'agente esegue il polling e analizza i dati nuovi nei file monitorati.

Intervallo valore: 1 o più

Impostazione predefinita: 100

multiLineStartPattern

Il modello per identificare l'inizio di un record. Un record è composto da una riga corrispondente al modello e da tutte le righe successive non corrispondenti al modello. I valori validi sono espressioni regolari. Come impostazione predefinita, ogni nuova riga nei file di log viene analizzata come un record.

partitionKeyOption

Il metodo per generare la chiave di partizione. I valori validi sono RANDOM (integer generato casualmente) e DETERMINISTIC (un valore hash calcolato dai dati).

Impostazione predefinita: RANDOM

skipHeaderLines

Il numero di righe necessarie perché l'agente salti l'analisi all'inizio dei file monitorati.

Intervallo valore: 0 o più

Impostazione predefinita: 0 (zero)

truncatedRecordTerminator

La stringa che l'agente utilizza per troncare un record analizzato quando le dimensioni del record eccedono il limite di dimensioni del record . (1.000 KB)

Impostazione predefinita: '\n' (nuova riga)

Monitoraggio di più directory di file e scrittura in flussi multipli

Specificando più impostazioni di configurazione del flusso, puoi configurare l'agente in modo che monitori più directory di file e invii dati a più flussi. Nel seguente esempio di configurazione, l'agente monitora due directory di file e invia i dati rispettivamente a un flusso Kinesis e a un flusso di distribuzione Firehose. Tieni presente che puoi specificare endpoint diversi per Kinesis Data Streams e Firehose in modo che Kinesis stream e Firehose non debbano necessariamente trovarsi nella stessa regione.

{ "cloudwatch.emitMetrics": true, "kinesis.endpoint": "https://your/kinesis/endpoint", "firehose.endpoint": "https://your/firehose/endpoint", "flows": [ { "filePattern": "/tmp/app1.log*", "kinesisStream": "yourkinesisstream" }, { "filePattern": "/tmp/app2.log*", "deliveryStream": "yourfirehosedeliverystream" } ] }

Per informazioni più dettagliate sull'utilizzo dell'agente con Firehose, consulta Writing to Amazon Data Firehose with Kinesis Agent.

Utilizza l'agente per preelaborare i dati

L'agente può preelaborare i record analizzati dai file monitorati prima di inviarli al flusso. È possibile abilitare questa funzionalità aggiungendo le impostazioni di configurazione dataProcessingOptions al flusso di file. Una o più opzioni di elaborazione possono essere aggiunte e saranno eseguite nell'ordine specificato.

L'agente supporta le seguenti opzioni di elaborazione elencate. Poiché l'agente è open source, è possibile sviluppare ulteriormente e ampliare le opzioni di elaborazione. Puoi scaricare l'agente dall’agente Kinesis.

Opzioni di elaborazione
SINGLELINE

Converte un record a più righe in un record a riga singola rimuovendo i caratteri di nuova riga, gli spazi iniziali e finali.

{ "optionName": "SINGLELINE" }
CSVTOJSON

Converte un record da un formato delimitatore separato a un formato JSON.

{ "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", ... ], "delimiter": "yourdelimiter" }
customFieldNames

[Obbligatorio] I nomi di campo utilizzati come chiavi in ciascuna coppia chiave-valore JSON. Ad esempio, se specifichi ["f1", "f2"], il record "v1, v2" sarà convertito in {"f1":"v1","f2":"v2"}.

delimiter

La stringa utilizzata come delimitatore nel record. L'impostazione predefinita è una virgola (,).

LOGTOJSON

Converte un record da un formato log a un formato JSON. I formati di log supportati sono Apache Common Log, Apache Combined Log, Apache Error Log e RFC3164 Syslog.

{ "optionName": "LOGTOJSON", "logFormat": "logformat", "matchPattern": "yourregexpattern", "customFieldNames": [ "field1", "field2", ] }
logFormat

[Obbligatorio] Il formato di inserimento dei log. I seguenti sono i valori possibili:

  • COMMONAPACHELOG - Il formato Apache Common Log. Ogni voce di log ha il seguente modello come impostazione predefinita: "%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes}".

  • COMBINEDAPACHELOG: il formato Apache Combined Log. Ogni voce di log ha il seguente modello come impostazione predefinita: "%{host} %{ident} %{authuser} [%{datetime}] \"%{request}\" %{response} %{bytes} %{referrer} %{agent}".

  • APACHEERRORLOG: il formato Apache Error Log. Ogni voce di log ha il seguente modello come impostazione predefinita: "[%{timestamp}] [%{module}:%{severity}] [pid %{processid}:tid %{threadid}] [client: %{client}] %{message}".

  • SYSLOG: il formato RFC3164 Syslog. Ogni voce di log ha il seguente modello come impostazione predefinita: "%{timestamp} %{hostname} %{program}[%{processid}]: %{message}".

matchPattern

Il modello di espressione regolare utilizzato per estrarre valori dalle voci di log. Questa impostazione viene utilizzata se la tua voce di log non è in uno dei formati di log predefiniti. Se questa impostazione viene utilizzata, devi specificare anche customFieldNames.

customFieldNames

I nomi di campo obbligatori utilizzati come chiavi in ciascuna coppia chiave-valore JSON. Puoi utilizzare questa impostazione per definire i nomi dei campi per i valori estratti da matchPattern oppure sovrascrivere i nomi dei campi predefiniti dei formati di log predefiniti.

Esempio : configurazione LOGTOJSON

Questo è un esempio di una configurazione LOGTOJSON per una voce Apache Common Log convertita in formato JSON:

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" }

Prima della conversione:

64.242.88.10 - - [07/Mar/2004:16:10:02 -0800] "GET /mailman/listinfo/hsdivision HTTP/1.1" 200 6291

Dopo la conversione:

{"host":"64.242.88.10","ident":null,"authuser":null,"datetime":"07/Mar/2004:16:10:02 -0800","request":"GET /mailman/listinfo/hsdivision HTTP/1.1","response":"200","bytes":"6291"}
Esempio : configurazione LOGTOJSON con campi personalizzati

Ecco un altro esempio di configurazione LOGTOJSON:

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "customFieldNames": ["f1", "f2", "f3", "f4", "f5", "f6", "f7"] }

Con questa impostazione di configurazione, la stessa voce Apache Common Log dall'esempio precedente viene convertita in formato JSON come segue:

{"f1":"64.242.88.10","f2":null,"f3":null,"f4":"07/Mar/2004:16:10:02 -0800","f5":"GET /mailman/listinfo/hsdivision HTTP/1.1","f6":"200","f7":"6291"}
Esempio : convertire la voce Apache Common Log

La seguente configurazione di flusso converte una voce Apache Common Log in record a riga singola in formato JSON:

{ "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "my-stream", "dataProcessingOptions": [ { "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG" } ] } ] }
Esempio : convertire record a più righe

La seguente configurazione del flusso analizza i record a più righe la cui prima riga inizia con "[SEQUENCE=". Ogni record viene convertito in un record a riga singola. Quindi, i valori vengono estratti dal record in base a un delimitatore di schede. I valori estratti sono mappati in valori customFieldNames specificati per formare un record a riga singola in formato JSON.

{ "flows": [ { "filePattern": "/tmp/app.log*", "kinesisStream": "my-stream", "multiLineStartPattern": "\\[SEQUENCE=", "dataProcessingOptions": [ { "optionName": "SINGLELINE" }, { "optionName": "CSVTOJSON", "customFieldNames": [ "field1", "field2", "field3" ], "delimiter": "\\t" } ] } ] }
Esempio : configurazione LOGTOJSON con modello corrispondente

Questo è un esempio di una configurazione LOGTOJSON per una voce Apache Common Log convertita in formato JSON, con l'ultimo campo (byte) omesso:

{ "optionName": "LOGTOJSON", "logFormat": "COMMONAPACHELOG", "matchPattern": "^([\\d.]+) (\\S+) (\\S+) \\[([\\w:/]+\\s[+\\-]\\d{4})\\] \"(.+?)\" (\\d{3})", "customFieldNames": ["host", "ident", "authuser", "datetime", "request", "response"] }

Prima della conversione:

123.45.67.89 - - [27/Oct/2000:09:27:09 -0400] "GET /java/javaResources.html HTTP/1.0" 200

Dopo la conversione:

{"host":"123.45.67.89","ident":null,"authuser":null,"datetime":"27/Oct/2000:09:27:09 -0400","request":"GET /java/javaResources.html HTTP/1.0","response":"200"}

Comandi dell'interfaccia a riga di comando dell'agente

Avviare automaticamente l'agente all'avvio del sistema:

sudo chkconfig aws-kinesis-agent on

Controlla lo stato dell'agente:

sudo service aws-kinesis-agent status

Interrompi l'agente:

sudo service aws-kinesis-agent stop

Leggi il file di log dell'agente da questa posizione:

/var/log/aws-kinesis-agent/aws-kinesis-agent.log

Disinstalla l'agente:

sudo yum remove aws-kinesis-agent

Domande frequenti

Esiste un agente Kinesis per Windows?

L'agente Kinesis per Windows è un software diverso dall'agente Kinesis per piattaforme Linux.

Perché l'agente Kinesis rallenta e/o RecordSendErrors aumenta?

Di solito ciò è dovuto alla limitazione di Kinesis. Controlla la WriteProvisionedThroughputExceeded metrica per Kinesis Data Streams o la ThrottledRecords metrica per Firehose Delivery Streams. Qualsiasi aumento rispetto a 0 di questi parametri indica che è necessario aumentare i limiti dei flussi. Per ulteriori informazioni, consulta Limiti del flusso di dati Kinesis e Flussi di consegna di Amazon Firehose.

Una volta esclusa la limitazione, verifica se Kinesis Agent è configurato in modo da monitorare grandi quantità di file di piccole dimensioni. Si verifica un ritardo nel momento in cui Kinesis Agent esegue il tail di un nuovo file, quindi Kinesis Agent dovrebbe eseguire la coda su una piccola quantità di file più grandi. Prova a consolidare i tuoi file di log in file più grandi.

Perché ricevo delle java.lang.OutOfMemoryError eccezioni?

Kinesis Agent non dispone di memoria sufficiente per gestire il carico di lavoro corrente. Prova ad aumentare, JAVA_START_HEAP inserire /usr/bin/start-aws-kinesis-agent e JAVA_MAX_HEAP riavviare l'agente.

Perché IllegalStateException : connection pool shut down ricevo delle eccezioni?

Kinesis Agent non dispone di connessioni sufficienti per gestire il carico di lavoro corrente. Prova ad aumentare maxConnections e maxSendingThreads a inserire le impostazioni generali della configurazione dell'agente su. /etc/aws-kinesis/agent.json Il valore predefinito per questi campi è 12 volte superiore ai processori di runtime disponibili. Consulta AgentConfiguration.java per ulteriori informazioni sulle impostazioni avanzate delle configurazioni degli agenti.

Come posso eseguire il debug di un altro problema con Kinesis Agent?

DEBUGi log di livello possono essere abilitati in. /etc/aws-kinesis/log4j.xml

Come devo configurare Kinesis Agent?

Più piccolo èmaxBufferSizeBytes, più frequentemente Kinesis Agent invierà i dati. Ciò può essere utile in quanto riduce i tempi di consegna dei record, ma aumenta anche le richieste al secondo a Kinesis.

Perché Kinesis Agent invia record duplicati?

Ciò si verifica a causa di un'errata configurazione nella coda dei file. Assicurati che ognuno corrisponda fileFlow’s filePattern a un solo file. Ciò può verificarsi anche se la logrotate modalità utilizzata è copytruncate attiva. Prova a passare alla modalità predefinita o crea per evitare duplicazioni. Per ulteriori informazioni sulla gestione dei record duplicati, vedere Gestione dei record duplicati.