ItemReader - AWS Step Functions

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

ItemReader

O campo ItemReader é um objeto JSON, que especifica um conjunto de dados e sua localização. Um estado Mapa Distribuído usa esse conjunto de dados como entrada. O exemplo a seguir mostra a sintaxe do campo ItemReader no caso de um conjunto de dados ser um arquivo CSV armazenado em um bucket do Amazon S3.

"ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "myBucket", "Key": "csvDataset/ratings.csv" } }
dica

No Workflow Studio, você especifica o conjunto de dados e sua localização no campo Origem do item.

Conteúdo do campo ItemReader

Dependendo do seu conjunto de dados, o conteúdo do campo ItemReader varia. Por exemplo, se seu conjunto de dados for uma matriz JSON transmitida de uma etapa anterior do fluxo de trabalho, o campo ItemReader será omitido. Se seu conjunto de dados for uma fonte de dados do Amazon S3, esse campo conterá os seguintes subcampos.

ReaderConfig

Um objeto JSON que especifica os seguintes detalhes:

  • InputType

    Especifica o tipo de fonte de dados do Amazon S3, como arquivo CSV, objeto, arquivo JSON ou uma lista do inventário Amazon S3. No Workflow Studio, você pode selecionar um tipo de entrada na lista suspensa de origem do item do Amazon S3, sob o campo Origem do item.

  • CSVHeaderLocation

    nota

    Esse campo deverá ser especificado somente se um arquivo CSV for usado como conjunto de dados.

    Aceita um dos seguintes valores para especificar a localização do cabeçalho da coluna:

    Importante

    Atualmente, o Step Functions é compatível com cabeçalhos CSV de até 10 KB.

    • FIRST_ROW — use essa opção se a primeira linha do arquivo for o cabeçalho.

    • GIVEN — use essa opção para especificar o cabeçalho na definição da máquina de estado. Por exemplo, se seu arquivo CSV contém os seguintes dados.

      1,307,3.5,1256677221 1,481,3.5,1256677456 1,1091,1.5,1256677471 ...

      Forneça a seguinte matriz JSON como cabeçalho CSV.

      "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "GIVEN", "CSVHeaders": [ "userId", "movieId", "rating", "timestamp" ] } }
    dica

    No Workflow Studio, você pode encontrar essa opção em Configuração adicional, no campo Origem do item.

  • MaxItems

    Limita o número de itens de dados transmitidos para o estado Map. Por exemplo, suponha que você vá fornecer um arquivo CSV contendo 1.000 linhas e especificar um limite de 100. Em seguida, o intérprete transmite apenas 100 linhas para o estado Map. O estado Map processa os itens em ordem sequencial, começando depois da linha do cabeçalho.

    Por padrão, o estado Map itera todos os itens no conjunto de dados especificado.

    nota

    Atualmente, você pode especificar um limite de até 100.000.000. O estado Mapa Distribuído interrompe a leitura de itens além desse limite.

    dica

    No Workflow Studio, você pode encontrar essa opção em Configuração adicional, no campo Origem do item.

    Como alternativa, você pode especificar um caminho de referência para um par de chave-valor existente na entrada do estado Mapa Distribuído. Esse caminho deve ser resolvido como um inteiro positivo. O caminho de referência é especificado no subcampo MaxItemsPath.

    Importante

    Você pode especificar o MaxItems ou o subcampo MaxItemsPath, mas não ambos.

Resource

A ação da API do Amazon S3 que o Step Functions deve invocar dependendo do conjunto de dados especificado.

Parameters

Um objeto JSON que especifica o nome do bucket do Amazon S3 e a chave do objeto em que o conjunto de dados está armazenado.

Importante

Certifique-se de que seus buckets do Amazon S3 estejam na mesma Conta da AWS e Região da AWS que a máquina de estado.

Exemplos de conjuntos de dados

Você pode especificar uma das seguintes opções como conjunto de dados:

Importante

O Step Functions precisa das devidas permissões para acessar conjuntos de dados do Amazon S3 que você usa. Para obter informações sobre políticas do IAM para o conjunto de dados, consulte Políticas do IAM para conjuntos de dados.

Um estado Mapa Distribuído pode aceitar uma entrada JSON transmitida de uma etapa anterior no fluxo de trabalho. Essa entrada deve ser ou conter uma matriz dentro de um nó específico. Para selecionar um nó que contenha a matriz, você pode usar o campo ItemsPath.

Para processar itens individuais na matriz, o estado Mapa Distribuído inicia a execução de um fluxo de trabalho secundário para cada item da matriz. As guias a seguir mostram exemplos da entrada transmitidas para o estado Map e a entrada correspondente para a execução de um fluxo de trabalho secundário.

nota

O Step Functions omite o campo ItemReader quando o conjunto de dados é uma matriz JSON de uma etapa anterior.

Input passed to the Map state

Considere a seguinte matriz JSON de três itens.

"facts": [ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" } ]
Input passed to a child workflow execution

O estado Mapa Distribuído inicia três execuções do fluxo de trabalho secundário. Cada execução recebe um item de matriz como entrada. O exemplo a seguir mostra a entrada recebida pela execução de um fluxo de trabalho secundário.

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }

Um estado Mapa Distribuído pode iterar os objetos que são armazenados em um bucket do Amazon S3. Quando a execução do fluxo de trabalho atinge o estado Map, o Step Functions invoca a ação da API ListObjectsV2, que retorna uma matriz dos metadados do objeto do Amazon S3. Nessa matriz, cada item contém dados, como ETag e Key, para os dados armazenados no bucket.

Para processar itens individuais na matriz, o estado Mapa Distribuído inicia a execução de um fluxo de trabalho secundário. Por exemplo, suponha que seu bucket do Amazon S3 contenha 100 imagens. Então, a matriz retornada após invocar a ação da API ListObjectsV2 contém 100 itens. O estado Mapa Distribuído inicia 100 execuções de fluxo de trabalho secundário para processar cada item da matriz.

nota
  • Atualmente, o Step Functions também inclui um item para cada pasta criada em um bucket do Amazon S3 específico usando o console do Amazon S3. Isso resulta em uma execução adicional de fluxo de trabalho secundário, iniciada pelo estado Mapa Distribuído. Para evitar a criação de uma execução adicional de fluxo de trabalho secundário para a pasta, recomendamos que você use a AWS CLI para criar pastas. Para obter informações, consulte Comandos de alto nível do Amazon S3 no Guia do usuário da AWS Command Line Interface.

  • O Step Functions precisa das devidas permissões para acessar conjuntos de dados do Amazon S3 que você usa. Para obter informações sobre políticas do IAM para o conjunto de dados, consulte Políticas do IAM para conjuntos de dados.

As guias a seguir mostram exemplos da sintaxe do campo ItemReader e da entrada transmitida para a execução de um fluxo de trabalho secundário para esse conjunto de dados.

ItemReader syntax

Nesse exemplo, foi mostrado como organizar seus dados, que incluem imagens, arquivos JSON e objetos, dentro de um prefixo nomeado processData em um bucket do Amazon S3 designado myBucket.

"ItemReader": { "Resource": "arn:aws:states:::s3:listObjectsV2", "Parameters": { "Bucket": "myBucket", "Prefix": "processData" } }
Input passed to a child workflow execution

O estado Mapa Distribuído iniciará quantas execuções de fluxo de trabalho secundário forem necessárias para o número de itens no bucket do Amazon S3. O exemplo a seguir mostra a entrada recebida pela execução de um fluxo de trabalho secundário.

{ "Etag": "\"05704fbdccb224cb01c59005bebbad28\"", "Key": "processData/images/n02085620_1073.jpg", "LastModified": 1668699881, "Size": 34910, "StorageClass": "STANDARD" }

Um estado Mapa Distribuído pode aceitar um arquivo JSON armazenado em um bucket do Amazon S3 como um conjunto de dados. O arquivo JSON deve conter uma matriz.

Quando a execução do fluxo de trabalho atinge o estado Map, o Step Functions invoca a ação da API GetObject para buscar o arquivo JSON especificado. O estado Map então itera cada item na matriz e inicia a execução de um fluxo de trabalho secundário para cada item. Por exemplo, se seu arquivo JSON contiver 1.000 itens de matriz, o estado Map iniciará 1.000 execuções de fluxo de trabalho secundário.

nota
  • A entrada usada para iniciar a execução de um fluxo de trabalho secundário não pode exceder 256 KB. No entanto, o Step Functions oferece suporte à leitura de um item de até 8 MB de um arquivo CSV ou JSON quando você aplica o campo ItemSelector opcional para reduzir o tamanho de um item.

  • Atualmente, o Step Functions oferece suporte de no máximo 10 GB a arquivos individuais em um relatório do Inventário Amazon S3. No entanto, o Step Functions é capaz de processar mais de 10 GB se o tamanho de cada arquivo individual é inferior a esse valor.

  • O Step Functions precisa das devidas permissões para acessar conjuntos de dados do Amazon S3 que você usa. Para obter informações sobre políticas do IAM para o conjunto de dados, consulte Políticas do IAM para conjuntos de dados.

As guias a seguir mostram exemplos da sintaxe do campo ItemReader e da entrada transmitida para a execução de um fluxo de trabalho secundário para esse conjunto de dados.

Por exemplo, imagine que você tenha um arquivo JSON chamado factcheck.json. Você armazenou esse arquivo em um prefixo chamado jsonDataset, em um bucket do Amazon S3. A seguir, veja um exemplo do conjunto de dados JSON.

[ { "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }, { "verdict": "false", "statement_date": "6/7/2022", "statement_source": "television" }, { "verdict": "mostly-true", "statement_date": "5/18/2016", "statement_source": "news" }, ... ]
ItemReader syntax
"ItemReader": { "Resource": "arn:aws:states:::s3:getObject", "ReaderConfig": { "InputType": "JSON" }, "Parameters": { "Bucket": "myBucket", "Key": "jsonDataset/factcheck.json" } }
Input to a child workflow execution

O estado Mapa Distribuído iniciará quantas execuções de fluxo de trabalho secundário forem necessárias para o número de itens de matriz presentes no arquivo JSON. O exemplo a seguir mostra a entrada recebida pela execução de um fluxo de trabalho secundário.

{ "verdict": "true", "statement_date": "6/11/2008", "statement_source": "speech" }

Um estado Mapa Distribuído pode aceitar um arquivo CSV armazenado em um bucket do Amazon S3 como um conjunto de dados. Se você usar um arquivo CSV como conjunto de dados, precisará especificar um cabeçalho de coluna CSV. Para obter informações sobre como especificar um cabeçalho CSV, consulte Conteúdo do campo ItemReader.

Como não há um formato padronizado para criar e manter dados em arquivos CSV, o Step Functions analisa os arquivos CSV com base nas seguintes regras:

  • As vírgulas (,) são um delimitador que separa campos individuais.

  • As novas linhas são um delimitador que separa registros individuais.

  • Os campos são tratados como strings. Para conversões de tipo de dados, use a função intrínseca States.StringToJson em ItemSelector.

  • Não são necessárias aspas duplas (” “) para encerrar strings. No entanto, strings delimitadas por aspas duplas podem conter vírgulas e novas linhas que não funcionam como delimitadores.

  • Repita as aspas duplas para evitá-las.

  • Se o número de campos em uma linha for menor que o número de campos no cabeçalho, o Step Functions fornecerá strings vazias para os valores que estão faltando.

  • Se o número de campos em uma linha for maior que aquele no cabeçalho, o Step Functions ignorará os campos adicionais.

Para obter mais informações sobre como Step Functions analisa um arquivo CSV, consulte Example of parsing an input CSV file.

Quando a execução do fluxo de trabalho atinge o estado Map, o Step Functions invoca a ação da API GetObject para buscar o arquivo CSV especificado. O estado Map então itera cada linha no arquivo CSV e inicia a execução de um fluxo de trabalho secundário para processar os itens em cada linha. Por exemplo, suponha que você vá fornecer um arquivo CSV contendo 100 linhas como entrada. Então, o intérprete transmitirá cada linha para o estado Map. O estado Map processa os itens em ordem serial, começando depois da linha do cabeçalho.

nota
  • A entrada usada para iniciar a execução de um fluxo de trabalho secundário não pode exceder 256 KB. No entanto, o Step Functions oferece suporte à leitura de um item de até 8 MB de um arquivo CSV ou JSON quando você aplica o campo ItemSelector opcional para reduzir o tamanho de um item.

  • Atualmente, o Step Functions oferece suporte de no máximo 10 GB a arquivos individuais em um relatório do Inventário Amazon S3. No entanto, o Step Functions é capaz de processar mais de 10 GB se o tamanho de cada arquivo individual é inferior a esse valor.

  • O Step Functions precisa das devidas permissões para acessar conjuntos de dados do Amazon S3 que você usa. Para obter informações sobre políticas do IAM para o conjunto de dados, consulte Políticas do IAM para conjuntos de dados.

As guias a seguir mostram exemplos da sintaxe do campo ItemReader e da entrada transmitida para a execução de um fluxo de trabalho secundário para esse conjunto de dados.

ItemReader syntax

Por exemplo, digamos que você tenha um arquivo CSV chamado ratings.csv. Você armazenou esse arquivo em um prefixo designado csvDataset, em um bucket do Amazon S3.

{ "ItemReader": { "ReaderConfig": { "InputType": "CSV", "CSVHeaderLocation": "FIRST_ROW" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "myBucket", "Key": "csvDataset/ratings.csv" } } }
Input to a child workflow execution

O estado Mapa Distribuído iniciará quantas execuções de fluxo de trabalho secundário forem necessárias para o número de linhas presentes no arquivo CSV, excluindo a linha de cabeçalho, se presente no arquivo. O exemplo a seguir mostra a entrada recebida pela execução de um fluxo de trabalho secundário.

{ "rating": "3.5", "movieId": "307", "userId": "1", "timestamp": "1256677221" }

Um estado Mapa Distribuído pode aceitar um manifesto do Inventário Amazon S3 armazenado em um bucket do Amazon S3 como um conjunto de dados.

Quando a execução do fluxo de trabalho atinge o estado Map, o Step Functions invoca a ação da API GetObject para buscar o arquivo do manifesto do Inventário Amazon S3. O estado Map então itera os objetos no inventário para retornar uma matriz de metadados de objetos do Inventário Amazon S3.

nota
  • Atualmente, o Step Functions oferece suporte de no máximo 10 GB a arquivos individuais em um relatório do Inventário Amazon S3. No entanto, o Step Functions é capaz de processar mais de 10 GB se o tamanho de cada arquivo individual é inferior a esse valor.

  • O Step Functions precisa das devidas permissões para acessar conjuntos de dados do Amazon S3 que você usa. Para obter informações sobre políticas do IAM para o conjunto de dados, consulte Políticas do IAM para conjuntos de dados.

Veja a seguir o exemplo de um arquivo de inventário no formato CSV. Esse arquivo inclui os objetos chamados csvDataset e imageDataset, que são armazenados em um bucket do Amazon S3 com o nome sourceBucket.

"sourceBucket","csvDataset/","0","2022-11-16T00:27:19.000Z" "sourceBucket","csvDataset/titles.csv","3399671","2022-11-16T00:29:32.000Z" "sourceBucket","imageDataset/","0","2022-11-15T20:00:44.000Z" "sourceBucket","imageDataset/n02085620_10074.jpg","27034","2022-11-15T20:02:16.000Z" ...
Importante

Atualmente, o Step Functions não oferece suporte ao uso do relatório do Inventário Amazon S3 definido pelo usuário como um conjunto de dados. Você também deve se certificar de que o formato de saída do seu relatório do Inventário Amazon S3 seja CSV. Para obter mais informações sobre os Inventários Amazon S3 e como configurá-los, consulte Inventário Amazon S3 no Guia do usuário do Amazon S3.

O exemplo a seguir de um arquivo de manifesto de inventário mostra os cabeçalhos CSV dos metadados do objeto de inventário.

{ "sourceBucket" : "sourceBucket", "destinationBucket" : "arn:aws:s3:::inventory", "version" : "2016-11-30", "creationTimestamp" : "1668560400000", "fileFormat" : "CSV", "fileSchema" : "Bucket, Key, Size, LastModifiedDate", "files" : [ { "key" : "source-bucket/destination-prefix/data/20e55de8-9c21-45d4-99b9-46c732000228.csv.gz", "size" : 7300, "MD5checksum" : "a7ff4a1d4164c3cd55851055ec8f6b20" } ] }

As guias a seguir mostram exemplos da sintaxe do campo ItemReader e da entrada transmitida para a execução de um fluxo de trabalho secundário para esse conjunto de dados.

ItemReader syntax
{ "ItemReader": { "ReaderConfig": { "InputType": "MANIFEST" }, "Resource": "arn:aws:states:::s3:getObject", "Parameters": { "Bucket": "destinationBucket", "Key": "destination-prefix/source-bucket/config-ID/YYYY-MM-DDTHH-MMZ/manifest.json" } } }
Input to a child workflow execution
{ "LastModifiedDate": "2022-11-16T00:29:32.000Z", "Bucket": "sourceBucket", "Size": "3399671", "Key": "csvDataset/titles.csv" }

Dependendo dos campos selecionados ao configurar o relatório do Inventário Amazon S3, o conteúdo do arquivo manifest.json pode variar do exemplo mostrado.

Políticas do IAM para conjuntos de dados

Ao criar fluxos de trabalho com o console do Step Functions, o Step Functions pode gerar automaticamente políticas do IAM com base nos recursos na definição de fluxo de trabalho. Essas políticas incluem os privilégios mínimos necessários para permitir que o perfil da máquina de estado invoque a ação da API StartExecution para o estado Mapa Distribuído. Essas políticas também incluem os privilégios mínimos necessários para que o Step Functions acesse recursos da AWS, como buckets e objetos do Amazon S3 e funções do Lambda. É altamente recomendável que você inclua apenas as permissões que forem necessárias em suas políticas do IAM. Por exemplo, se o fluxo de trabalho incluir um estado Map no modo distribuído, defina o escopo de suas políticas até o bucket e a pasta específicos do Amazon S3 que contêm o conjunto de dados.

Importante

Se você especificar um bucket e um objeto do Amazon S3, ou prefixo, com um caminho de referência para um par de valores-chave existente na entrada do estado Mapa Distribuído, certifique-se de atualizar as políticas de IAM do fluxo de trabalho. Defina o escopo das políticas até o bucket e os nomes de objetos para os quais o caminho é resolvido em runtime.

Os exemplos de políticas do IAM a seguir concedem os privilégios mínimos necessários para acessar os conjuntos de dados do Amazon S3 usando as ações de API ListObjectsV2 e GetObject.

exemplo Política do IAM para objeto do Amazon S3 como conjunto de dados

O exemplo a seguir mostra uma política do IAM que concede os privilégios mínimos para acessar os objetos organizados em processImages em um bucket do Amazon S3 chamado myBucket.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:ListBucket" ], "Resource": [ "arn:aws:s3:::myBucket" ], "Condition": { "StringLike": { "s3:prefix": [ "processImages" ] } } } ] }
exemplo Política do IAM para um arquivo CSV como conjunto de dados

O exemplo a seguir mostra uma política do IAM que concede os privilégios mínimos para acessar um arquivo CSV chamado ratings.csv.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::myBucket/csvDataset/ratings.csv" ] } ] }
exemplo Política do IAM para um inventário Amazon S3 como conjunto de dados

O exemplo a seguir mostra uma política do IAM que concede os privilégios mínimos para acessar um relatório de inventário Amazon S3.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:GetObject" ], "Resource": [ "arn:aws:s3:::destination-prefix/source-bucket/config-ID/YYYY-MM-DDTHH-MMZ/manifest.json", "arn:aws:s3:::destination-prefix/source-bucket/config-ID/data/*" ] } ] }