Classe DynamicFrame - AWS Glue

Classe DynamicFrame

Uma das principais abstrações no Apache Spark é o DataFrame do SparkSQL, que é semelhante à construção DataFrame encontrada no R e Pandas. Um DataFrame é semelhante a uma tabela e é compatível com operações de estilo funcional (mapear/reduzir/filtrar etc.) e operações SQL (select, project, aggregate).

DataFrames são potentes e amplamente utilizados, mas têm limitações em relação às operações de extração, transformação e carregamento (ETL). Mais importante, eles exigem que um esquema seja especificado antes que qualquer dado seja carregado. O SparkSQL soluciona essa questão fazendo duas transmissões sobre dados: a primeira para inferir o esquema e a segunda para carregar os dados. No entanto, essa inferência é limitada e não corrige a desorganização de dados. Por exemplo, o mesmo campo pode ser de um tipo diferente em registros diferentes. O Apache Spark muitas vezes desiste e relata o tipo como string usando o texto do campo original. Ele pode estar incorreto, e convém ter controle mais preciso sobre como as discrepâncias do esquema são resolvidas. Para grandes conjuntos de dados, uma transmissão adicional sobre os dados de origem pode ser proibitivamente dispendiosa.

Para resolver essas limitações, o AWS Glue apresenta o DynamicFrame. Um DynamicFrame é semelhante a DataFrame, mas cada registro se autodescrevente, então nenhum esquema é necessário inicialmente. Em vez disso, o AWS Glue calcula um esquema instantaneamente quando necessário e codifica explicitamente inconsistências de esquema usando um tipo de escolha (ou união). Você pode resolver essas inconsistências para tornar seus conjuntos de dados compatíveis com armazenamentos de dados que exigem um esquema fixo.

Da mesma forma, um DynamicRecord representa um registro lógico em um DynamicFrame. Ele é igual a uma linha em um DataFrame do Spark, exceto pelo fato de que ele pode se autodescrever e ser usado para dados que não estão em conformidade com um esquema fixo. Ao usar o AWS Glue com o PySpark, você normalmente não manipula DynamicRecords independentes. Em vez disso, você transformará o conjunto de dados junto por meio de seu DynamicFrame.

Você pode converter DynamicFrames de e para DataFrames depois de resolver as inconsistências de esquema.

 — construção —

__init__

__init__(jdf, glue_ctx, name)
  • jdf – Uma referência ao quadro de dados na Java Virtual Machine (JVM).

  • glue_ctx – Um objeto GlueContext classe.

  • name – Uma string de nome opcional, vazia por padrão.

fromDF

fromDF(dataframe, glue_ctx, name)

Converte um DataFrame em um DynamicFrame, transformando campos DataFrame em campos DynamicRecord. Retorna um novo DynamicFrame.

Um DynamicRecord representa um registro lógico em um DynamicFrame. Ele é semelhante a uma linha em um DataFrame do Spark, exceto pelo fato de que ele pode se autodescrever e ser usado para dados que não estão em conformidade com um esquema fixo.

Essa função espera que as colunas com nomes duplicados em seu DataFrame já tenham sido resolvidas.

  • dataframe – O DataFrame Apache Spark SQL a ser convertido (obrigatório).

  • glue_ctx – O objeto GlueContext classe que especifica o contexto para essa transformação (obrigatório).

  • name: o nome do DynamicFrame resultante (opcional desde o AWS Glue 3.0).

toDF

toDF(options)

Converte um DynamicFrame a um DataFrame do Apache Spark, transformando campos DynamicRecords em DataFrame. Retorna um novo DataFrame.

Um DynamicRecord representa um registro lógico em um DynamicFrame. Ele é semelhante a uma linha em um DataFrame do Spark, exceto pelo fato de que ele pode se autodescrever e ser usado para dados que não estão em conformidade com um esquema fixo.

  • options: uma lista de opções. Especifique o tipo de destino se você escolher o tipo de ação Project e Cast. Os exemplos incluem.

    >>>toDF([ResolveOption("a.b.c", "KeepAsStruct")]) >>>toDF([ResolveOption("a.b.c", "Project", DoubleType())])

  — informações —

contagem

count( ) – Retorna o número de linhas no DataFrame subjacente.

Esquema

schema( ) – Retorna o esquema deste DynamicFrame ou, se não estiver disponível, o esquema do DataFrame subjacente.

Para obter mais informações sobre os tipos de DynamicFrame que compõem esse esquema, consulte Tipos de extensão do PySpark.

printSchema

printSchema( ) – Imprime o esquema do DataFrame subjacente.

show

show(num_rows) – Imprime um número de linhas especificado do DataFrame subjacente.

repartição

repartition(numPartitions): retorna um novo DynamicFrame com numPartitions partições.

coalesce

coalesce(numPartitions): retorna um novo DynamicFrame com numPartitions partições.

  — transformações —

apply_mapping

apply_mapping(mappings, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Aplica um mapeamento declarativo a um DynamicFrame e retorna um novo DynamicFrame com esses mapeamentos aplicados aos campos que você especificar. Os campos não especificados são omitidos do novo DynamicFrame.

  • mappings: uma lista de tuplas de mapeamento (obrigatório). Cada uma é composta por: coluna de fonte, tipo de fonte, coluna de destino, tipo de destino.

    Se houver um ponto "." no nome da coluna de origem, será necessário colocá-lo entre crases "``". Por exemplo, para mapear this.old.name (string) para thisNewName, você pode usar a seguinte tupla:

    ("`this.old.name`", "string", "thisNewName", "string")
  • transformation_ctx – Uma string única que é usada para identificar informações de estado (opcional).

  • info – Uma string a ser associada com o erro na geração de relatórios desta transformação (opcional).

  • stageThreshold: o número de erros encontrados durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

  • totalThreshold: o número de erros encontrados antes e durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

Exemplo: usar apply_mapping para renomear campos e alterar os tipos de campo

O exemplo de código a seguir mostra como usar o método apply_mapping para renomear campos selecionados e alterar os tipos de campo.

nota

Para acessar o conjunto de dados usado neste exemplo, consulte Exemplo de código: juntar e relacionar dados e siga as instruções em Etapa 1: crawling de dados no bucket do Amazon S3.

# Example: Use apply_mapping to reshape source data into # the desired column names and types as a new DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Select and rename fields, change field type print("Schema for the persons_mapped DynamicFrame, created with apply_mapping:") persons_mapped = persons.apply_mapping( [ ("family_name", "String", "last_name", "String"), ("name", "String", "first_name", "String"), ("birth_date", "String", "date_of_birth", "Date"), ] ) persons_mapped.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the persons_mapped DynamicFrame, created with apply_mapping: root |-- last_name: string |-- first_name: string |-- date_of_birth: date

drop_fields

drop_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Chama a transformação Classe FlatMap para remover campos de um DynamicFrame. Retorna um novo DynamicFrame com os campos especificados descartados.

  • paths: uma lista de cadeias de caracteres. Cada um contém o caminho completo para um nó de campo que você deseja descartar. É possível usar notação de pontos para especificar campos aninhados. Por exemplo, se campo first for secundário do campo name na árvore, especifique "name.first" para o caminho.

    Se houver um literal . no nome de um nó de campo, será necessário colocá-lo entre crases (`).

  • transformation_ctx – Uma string única que é usada para identificar informações de estado (opcional).

  • info – Uma string a ser associada com o erro na geração de relatórios desta transformação (opcional).

  • stageThreshold: o número de erros encontrados durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

  • totalThreshold: o número de erros encontrados antes e durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

Exemplo: usar drop_fields para remover campos de um DynamicFrame

Este código de exemplo usa o método drop_fields para remover campos aninhados e de nível superior selecionados de um DynamicFrame.

Exemplo de conjunto de dados

O exemplo usa o seguinte conjunto de dados que é representado pela tabela EXAMPLE-FRIENDS-DATA no código:

{"name": "Sally", "age": 23, "location": {"state": "WY", "county": "Fremont"}, "friends": []} {"name": "Varun", "age": 34, "location": {"state": "NE", "county": "Douglas"}, "friends": [{"name": "Arjun", "age": 3}]} {"name": "George", "age": 52, "location": {"state": "NY"}, "friends": [{"name": "Fred"}, {"name": "Amy", "age": 15}]} {"name": "Haruki", "age": 21, "location": {"state": "AK", "county": "Denali"}} {"name": "Sheila", "age": 63, "friends": [{"name": "Nancy", "age": 22}]}

Código de exemplo

# Example: Use drop_fields to remove top-level and nested fields from a DynamicFrame. # Replace MY-EXAMPLE-DATABASE with your Glue Data Catalog database name. # Replace EXAMPLE-FRIENDS-DATA with your table name. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame from Glue Data Catalog glue_source_database = "MY-EXAMPLE-DATABASE" glue_source_table = "EXAMPLE-FRIENDS-DATA" friends = glueContext.create_dynamic_frame.from_catalog( database=glue_source_database, table_name=glue_source_table ) print("Schema for friends DynamicFrame before calling drop_fields:") friends.printSchema() # Remove location.county, remove friends.age, remove age friends = friends.drop_fields(paths=["age", "location.county", "friends.age"]) print("Schema for friends DynamicFrame after removing age, county, and friend age:") friends.printSchema()
Schema for friends DynamicFrame before calling drop_fields: root |-- name: string |-- age: int |-- location: struct | |-- state: string | |-- county: string |-- friends: array | |-- element: struct | | |-- name: string | | |-- age: int Schema for friends DynamicFrame after removing age, county, and friend age: root |-- name: string |-- location: struct | |-- state: string |-- friends: array | |-- element: struct | | |-- name: string

filtrar

filter(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Retorna um novo DynamicFrame que contém todos os DynamicRecords no DynamicFrame de entrada que satisfazem uma função f predicada especificada.

  • f: a função predicada a ser aplicada a DynamicFrame. A função precisa ter um DynamicRecord como um argumento e retornar True, se DynamicRecord atender aos requisitos de filtro, ou False, caso contrário (obrigatório).

    Um DynamicRecord representa um registro lógico em um DynamicFrame. É semelhante a uma linha em um DataFrame do Spark, exceto pelo fato de que pode se autodescrever e ser usado para dados que não estejam em conformidade com um esquema fixo.

  • transformation_ctx – Uma string única que é usada para identificar informações de estado (opcional).

  • info – Uma string a ser associada com o erro na geração de relatórios desta transformação (opcional).

  • stageThreshold: o número de erros encontrados durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

  • totalThreshold: o número de erros encontrados antes e durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

Exemplo: usar filter para obter uma seleção filtrada de campos

Este exemplo usa o método filter para criar um novo DynamicFrame que inclui uma seleção filtrada de outros campos do DynamicFrame.

Assim como o método map, filter usa uma função como um argumento que é aplicado a cada registro no DynamicFrame original. A função usa um registro como entrada e retorna um valor booleano. Se o valor de retorno for true, o registro será incluído no DynamicFrame resultante. Se for false, o registro será omitido.

nota

Para acessar o conjunto de dados usado neste exemplo, consulte Exemplo de código: preparo de dados usando ResolveChoice, Lambda e ApplyMapping e siga as instruções em Etapa 1: crawling de dados no bucket do Amazon S3.

# Example: Use filter to create a new DynamicFrame # with a filtered selection of records from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create DynamicFrame from Glue Data Catalog medicare = glueContext.create_dynamic_frame.from_options( "s3", { "paths": [ "s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv" ] }, "csv", {"withHeader": True}, ) # Create filtered DynamicFrame with custom lambda # to filter records by Provider State and Provider City sac_or_mon = medicare.filter( f=lambda x: x["Provider State"] in ["CA", "AL"] and x["Provider City"] in ["SACRAMENTO", "MONTGOMERY"] ) # Compare record counts print("Unfiltered record count: ", medicare.count()) print("Filtered record count: ", sac_or_mon.count())
Unfiltered record count: 163065 Filtered record count: 564

join

join(paths1, paths2, frame2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Executa uma junção de igualdade com outro DynamicFrame e retorna o DynamicFrame resultante.

  • paths1 – Uma lista das chaves neste quadro para realizar a junção.

  • paths2 – Uma lista das chaves em outro quadro para realizar a junção.

  • frame2 - O outro DynamicFrame para realizar a junção.

  • transformation_ctx – Uma string única que é usada para identificar informações de estado (opcional).

  • info – Uma string a ser associada com o erro na geração de relatórios desta transformação (opcional).

  • stageThreshold: o número de erros encontrados durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

  • totalThreshold: o número de erros encontrados antes e durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

Exemplo: usar join para combinar DynamicFrames

Este exemplo usa o método join para realizar uma junção em três DynamicFrames. O AWS Glue executa a junção com base nas chaves de campo que você fornece. O DynamicFrame resultante contém linhas dos dois quadros originais em que as chaves especificadas correspondem.

A transformação do join mantém todos os campos intactos. Isso significa que os campos que você especificar para correspondência serão exibidos no DynamicFrame resultante, mesmo que sejam redundantes e contenham as mesmas chaves. Neste exemplo, usamos drop_fields para remover essas chaves redundantes após a junção.

nota

Para acessar o conjunto de dados usado neste exemplo, consulte Exemplo de código: juntar e relacionar dados e siga as instruções em Etapa 1: crawling de dados no bucket do Amazon S3.

# Example: Use join to combine data from three DynamicFrames from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load DynamicFrames from Glue Data Catalog persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) memberships = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="memberships_json" ) orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() print("Schema for the memberships DynamicFrame:") memberships.printSchema() print("Schema for the orgs DynamicFrame:") orgs.printSchema() # Join persons and memberships by ID persons_memberships = persons.join( paths1=["id"], paths2=["person_id"], frame2=memberships ) # Rename and drop fields from orgs # to prevent field name collisions with persons_memberships orgs = ( orgs.drop_fields(["other_names", "identifiers"]) .rename_field("id", "org_id") .rename_field("name", "org_name") ) # Create final join of all three DynamicFrames legislators_combined = orgs.join( paths1=["org_id"], paths2=["organization_id"], frame2=persons_memberships ).drop_fields(["person_id", "org_id"]) # Inspect the schema for the joined data print("Schema for the new legislators_combined DynamicFrame:") legislators_combined.printSchema()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the memberships DynamicFrame: root |-- area_id: string |-- on_behalf_of_id: string |-- organization_id: string |-- role: string |-- person_id: string |-- legislative_period_id: string |-- start_date: string |-- end_date: string Schema for the orgs DynamicFrame: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string Schema for the new legislators_combined DynamicFrame: root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

mapear

map(f, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Retorna um novo DynamicFrame que resulta da aplicação da função de mapeamento especificada a todos os registros no DynamicFrame original.

  • f: a função de mapeamento a ser aplicada a todos os registros em DynamicFrame. A função precisa levar um DynamicRecord como um argumento e retornar um novo DynamicRecord (obrigatório).

    Um DynamicRecord representa um registro lógico em um DynamicFrame. É semelhante a uma linha em um DataFrame do Apache Spark, exceto pelo fato de que pode se autodescrever e ser usado para dados que não estejam em conformidade com um esquema fixo.

  • transformation_ctx – Uma string única que é usada para identificar informações de estado (opcional).

  • info: uma cadeira de caracteres que é associada a erros na transformação (opcional).

  • stageThreshold: o número máximo de erros que podem ocorrer na transformação antes que ela falhe (opcional). O padrão é zero.

  • totalThreshold: o número máximo de erros que podem ocorrer em geral antes que falhe (opcional). O padrão é zero.

Exemplo: usar map para aplicar uma função a cada registro em um DynamicFrame

Este exemplo mostra como usar o método map para aplicar uma função a cada registro de um DynamicFrame. Especificamente, este exemplo aplica uma função chamada MergeAddress para cada registro para mesclar vários campos de endereço em um único tipo struct.

nota

Para acessar o conjunto de dados usado neste exemplo, consulte Exemplo de código: preparo de dados usando ResolveChoice, Lambda e ApplyMapping e siga as instruções em Etapa 1: crawling de dados no bucket do Amazon S3.

# Example: Use map to combine fields in all records # of a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema medicare = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://awsglue-datasets/examples/medicare/Medicare_Hospital_Provider.csv"]}, "csv", {"withHeader": True}) print("Schema for medicare DynamicFrame:") medicare.printSchema() # Define a function to supply to the map transform # that merges address fields into a single field def MergeAddress(rec): rec["Address"] = {} rec["Address"]["Street"] = rec["Provider Street Address"] rec["Address"]["City"] = rec["Provider City"] rec["Address"]["State"] = rec["Provider State"] rec["Address"]["Zip.Code"] = rec["Provider Zip Code"] rec["Address"]["Array"] = [rec["Provider Street Address"], rec["Provider City"], rec["Provider State"], rec["Provider Zip Code"]] del rec["Provider Street Address"] del rec["Provider City"] del rec["Provider State"] del rec["Provider Zip Code"] return rec # Use map to apply MergeAddress to every record mapped_medicare = medicare.map(f = MergeAddress) print("Schema for mapped_medicare DynamicFrame:") mapped_medicare.printSchema()
Schema for medicare DynamicFrame: root |-- DRG Definition: string |-- Provider Id: string |-- Provider Name: string |-- Provider Street Address: string |-- Provider City: string |-- Provider State: string |-- Provider Zip Code: string |-- Hospital Referral Region Description: string |-- Total Discharges: string |-- Average Covered Charges: string |-- Average Total Payments: string |-- Average Medicare Payments: string Schema for mapped_medicare DynamicFrame: root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

mergeDynamicFrame

mergeDynamicFrame(stage_dynamic_frame, primary_keys, transformation_ctx = "", options = {}, info = "", stageThreshold = 0, totalThreshold = 0)

Mescla esse DynamicFrame com uma preparação DynamicFrame de acordo com as chaves primárias especificadas para identificar registros. Registros duplicados (com as mesmas chaves primárias) não são eliminados. Se não houver nenhum registro correspondente no quadro de preparação, todos os registros (incluindo os duplicados) serão retidos da origem. Se o quadro de preparação tiver registros correspondentes, os do quadro de preparação substituirão os da origem no AWS Glue.

  • stage_dynamic_frame: o DynamicFrame de preparação para mesclar.

  • primary_keys a lista de campos de chave primária para corresponder aos registros da fonte e quadros dinâmicos de preparação.

  • transformation_ctx: uma string exclusiva usada para recuperar os metadados sobre a transformação atual (opcional).

  • options: uma string de pares nome-valor JSON que fornecem informações adicionais para essa transformação. Esse argumento não é usado no momento.

  • info: uma String. Qualquer string a ser associada a erros nessa transformação.

  • stageThreshold: uma Long. O número de erros na transformação para a qual o processamento precisa apresentar falhas.

  • totalThreshold: uma Long. O número total de erros até esta transformação (inclusive) para os quais o processamento precisa apresentar falhas.

Este método retorna um novo DynamicFrame obtido ao mesclar este DynamicFrame com a preparação DynamicFrame.

O DynamicFrame retornado contém registro A nestes casos:

  • Se A existir no quadro de origem e no quadro de preparação, o A do quadro de preparação será retornado.

  • Se A estiver na tabela de origem e A.primaryKeys não estiver no stagingDynamicFrame, A não será atualizado na tabela de preparação.

O quadro de origem e o quadro de preparação não precisam ter o mesmo esquema.

Exemplo: use mergeDynamicFrame para mesclar dois DynamicFrames com base em uma chave primária

O exemplo de código a seguir mostra como usar o método mergeDynamicFrame para mesclar um DynamicFrame com uma “preparação” DynamicFrame com base na chave primária id.

Exemplo de conjunto de dados

O exemplo usa dois DynamicFrames de um DynamicFrameCollection chamado split_rows_collection. Está é uma lista de limites no split_rows_collection.

dict_keys(['high', 'low'])

Código de exemplo

# Example: Use mergeDynamicFrame to merge DynamicFrames # based on a set of specified primary keys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.transforms import SelectFromCollection # Inspect the original DynamicFrames frame_low = SelectFromCollection.apply(dfc=split_rows_collection, key="low") print("Inspect the DynamicFrame that contains rows where ID < 10") frame_low.toDF().show() frame_high = SelectFromCollection.apply(dfc=split_rows_collection, key="high") print("Inspect the DynamicFrame that contains rows where ID > 10") frame_high.toDF().show() # Merge the DynamicFrames based on the "id" primary key merged_high_low = frame_high.mergeDynamicFrame( stage_dynamic_frame=frame_low, primary_keys=["id"] ) # View the results where the ID is 1 or 20 print("Inspect the merged DynamicFrame that contains the combined rows") merged_high_low.toDF().where("id = 1 or id= 20").orderBy("id").show()
Inspect the DynamicFrame that contains rows where ID < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 2| 0| fax| 202-225-3307| | 2| 1| phone| 202-225-5731| | 3| 0| fax| 202-225-3307| | 3| 1| phone| 202-225-5731| | 4| 0| fax| 202-225-3307| | 4| 1| phone| 202-225-5731| | 5| 0| fax| 202-225-3307| | 5| 1| phone| 202-225-5731| | 6| 0| fax| 202-225-3307| | 6| 1| phone| 202-225-5731| | 7| 0| fax| 202-225-3307| | 7| 1| phone| 202-225-5731| | 8| 0| fax| 202-225-3307| | 8| 1| phone| 202-225-5731| | 9| 0| fax| 202-225-3307| | 9| 1| phone| 202-225-5731| | 10| 0| fax| 202-225-6328| | 10| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains rows where ID > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| fax| 202-225-6328| | 11| 1| phone| 202-225-4576| | 11| 2| twitter| RepTrentFranks| | 12| 0| fax| 202-225-6328| | 12| 1| phone| 202-225-4576| | 12| 2| twitter| RepTrentFranks| | 13| 0| fax| 202-225-6328| | 13| 1| phone| 202-225-4576| | 13| 2| twitter| RepTrentFranks| | 14| 0| fax| 202-225-6328| | 14| 1| phone| 202-225-4576| | 14| 2| twitter| RepTrentFranks| | 15| 0| fax| 202-225-6328| | 15| 1| phone| 202-225-4576| | 15| 2| twitter| RepTrentFranks| | 16| 0| fax| 202-225-6328| | 16| 1| phone| 202-225-4576| | 16| 2| twitter| RepTrentFranks| | 17| 0| fax| 202-225-6328| | 17| 1| phone| 202-225-4576| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the merged DynamicFrame that contains the combined rows +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| fax| 202-225-3307| | 1| 1| phone| 202-225-5731| | 20| 0| fax| 202-225-5604| | 20| 1| phone| 202-225-6536| | 20| 2| twitter| USRepLong| +---+-----+------------------------+-------------------------+

relationalize

relationalize(root_table_name, staging_path, options, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Converte um DynamicFrame em um formulário que se encaixa em um banco de dados relacional. Relacionar um DynamicFrame é especialmente útil quando você deseja mover dados de um ambiente NoSQL como o DynamoDB para um banco de dados relacional como o MySQL.

A transformação gera uma lista de quadros separando colunas aninhadas e colunas de matriz dinâmica. A coluna de matriz dinâmica pode ser adicionada à tabela raiz usando a chave de união gerada durante a fase de desaninhamento.

  • root_table_name – O nome da a tabela raiz.

  • staging_path: o caminho onde o método pode armazenar partições de tabelas dinâmicas no formato CSV (opcional). As tabelas dinâmicas são lidas novamente nesse caminho.

  • options – Um dicionário de parâmetros opcionais.

  • transformation_ctx – Uma string única que é usada para identificar informações de estado (opcional).

  • info – Uma string a ser associada com o erro na geração de relatórios desta transformação (opcional).

  • stageThreshold: o número de erros encontrados durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

  • totalThreshold: o número de erros encontrados antes e durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

Exemplo: usar relationalize para nivelar um esquema aninhado em um DynamicFrame

Este exemplo de código usa o método relationalize para nivelar um esquema aninhado em um formulário que se encaixa em um banco de dados relacional.

Exemplo de conjunto de dados

O exemplo usa um DynamicFrame chamado legislators_combined com o esquema a seguir. legislators_combined tem vários campos aninhados, como links, images e contact_details que serão nivelados pela transformação relationalize.

root |-- role: string |-- seats: int |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- type: string |-- sort_name: string |-- area_id: string |-- images: array | |-- element: struct | | |-- url: string |-- on_behalf_of_id: string |-- other_names: array | |-- element: struct | | |-- note: string | | |-- name: string | | |-- lang: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- name: string |-- birth_date: string |-- organization_id: string |-- gender: string |-- classification: string |-- legislative_period_id: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- image: string |-- given_name: string |-- start_date: string |-- family_name: string |-- id: string |-- death_date: string |-- end_date: string

Código de exemplo

# Example: Use relationalize to flatten # a nested schema into a format that fits # into a relational database. # Replace DOC-EXAMPLE-S3-BUCKET/tmpDir with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Apply relationalize and inspect new tables legislators_relationalized = legislators_combined.relationalize( "l_root", "s3://DOC-EXAMPLE-BUCKET/tmpDir" ) legislators_relationalized.keys() # Compare the schema of the contact_details # nested field to the new relationalized table that # represents it legislators_combined.select_fields("contact_details").printSchema() legislators_relationalized.select("l_root_contact_details").toDF().where( "id = 10 or id = 75" ).orderBy(["id", "index"]).show()

A saída a seguir permite comparar o esquema do campo aninhado chamado contact_details com a tabela criada pela transformação relationalize. Observe que os registros da tabela apontam para a tabela principal usando uma chave estrangeira chamada id e uma coluna index que representa as posições da matriz.

dict_keys(['l_root', 'l_root_images', 'l_root_links', 'l_root_other_names', 'l_root_contact_details', 'l_root_identifiers']) root |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 10| 0| fax| 202-225-4160| | 10| 1| phone| 202-225-3436| | 75| 0| fax| 202-225-6791| | 75| 1| phone| 202-225-2861| | 75| 2| twitter| RepSamFarr| +---+-----+------------------------+-------------------------+

rename_field

rename_field(oldName, newName, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Renomeia um campo neste DynamicFrame e retorna um novo DynamicFrame com o campo renomeado.

  • oldName – O caminho completo para o nó que você quer renomear.

    Se o nome antigo contiver pontos, RenameField não funcionará a menos que você coloque acentos graves em torno dele (`). Por exemplo, para substituir this.old.name por thisNewName, você chamaria rename_field da seguinte maneira.

    newDyF = oldDyF.rename_field("`this.old.name`", "thisNewName")
  • newName – O novo nome, como um caminho completo.

  • transformation_ctx – Uma string única que é usada para identificar informações de estado (opcional).

  • info – Uma string a ser associada com o erro na geração de relatórios desta transformação (opcional).

  • stageThreshold: o número de erros encontrados durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

  • totalThreshold: o número de erros encontrados antes e durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

Exemplo: usar rename_field para renomear campos em um DynamicFrame

Este exemplo de código usa o método rename_field para renomear campos em um DynamicFrame. Observe que o exemplo usa o encadeamento de métodos para renomear vários campos ao mesmo tempo.

nota

Para acessar o conjunto de dados usado neste exemplo, consulte Exemplo de código: juntar e relacionar dados e siga as instruções em Etapa 1: crawling de dados no bucket do Amazon S3.

Código de exemplo

# Example: Use rename_field to rename fields # in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Inspect the original orgs schema orgs = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="organizations_json" ) print("Original orgs schema: ") orgs.printSchema() # Rename fields and view the new schema orgs = orgs.rename_field("id", "org_id").rename_field("name", "org_name") print("New orgs schema with renamed fields: ") orgs.printSchema()
Original orgs schema: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- id: string |-- classification: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string New orgs schema with renamed fields: root |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- classification: string |-- org_id: string |-- org_name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- image: string |-- seats: int |-- type: string

resolveChoice

resolveChoice(specs = None, choice = "" , database = None , table_name = None , transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, catalog_id = None)

Resolve um tipo de escolha neste DynamicFrame e retorna o novo DynamicFrame.

  • specs: uma lista de ambiguidades específicas para resolver, cada uma na forma de uma tupla: (field_path, action).

    Há duas maneiras de usar resolveChoice. A primeira é usar o argumento specs para indicar uma sequência de colunas específicas e como resolvê-las. O outro modo para resolveChoice é usar o argumento choice para especificar uma única resolução para todos os ChoiceTypes.

    Valores para specs são especificados como tuplas compostas de pares (field_path, action). O valor field_path identifica um elemento ambíguo específico, e o valor action identifica a resolução correspondente. A seguir estão as ações possíveis:

    • cast:type: tenta converter todos os valores para o tipo especificado. Por exemplo: cast:int.

    • make_cols: converte cada tipo distinto em uma coluna com o nome columnName_type. Ele resolve uma possível ambiguidade ao nivelar os dados. Por exemplo, se columnA puder ser int ou string, a resolução seria produzir duas colunas chamadas columnA_int e columnA_string no DynamicFrame resultante.

    • make_struct: resolve uma possível ambiguidade usando um struct para representar os dados. Por exemplo, se os dados em uma coluna pudessem ser um int ou string, usar a ação make_struct produziria uma coluna de estruturas no DynamicFrame. Cada estrutura contém um int e um string.

    • project:type: resolve uma possível ambiguidade projetando todos os dados para um dos possíveis tipos de dados. Por exemplo, se os dados em uma coluna pudessem ser um int ou string, usar a ação project:string produzirá uma coluna de estruturas no DynamicFrame resultante, onde todos os valores int foram convertidos em strings.

    Se o field_path identifica um array, insira colchetes vazios após o nome do array para evitar ambiguidades. Por exemplo, vamos supor que você esteja trabalhando com dados estruturados da seguinte maneira:

    "myList": [ { "price": 100.00 }, { "price": "$100.00" } ]

    Você pode selecionar a versão numérica em vez da versão string do preço definindo field_path como "myList[].price" e action como "cast:double".

    nota

    É possível usar somente um dos parâmetros specs e choice. Se o parâmetro specs não for None, o parâmetro choice precisará ser uma string vazia. Por outro lado, se o choice não for uma string vazia, o parâmetro specs precisará ser None.

  • choice: especifica uma única resolução para todos os ChoiceTypes. É possível usar essa ação em casos em que a lista completa de ChoiceTypes for desconhecida antes do runtime. Além das ações listadas anteriormente para specs, esse modo também aceita a seguinte ação:

    • match_catalogChoiceType: tenta converter cada para o tipo correspondente na tabela do Data Catalog especificada.

  • database: banco de dados do Data Catalog a ser usado com a ação match_catalog.

  • table_name: a tabela do Data Catalog a ser usada com a ação match_catalog.

  • transformation_ctx – Uma string única que é usada para identificar informações de estado (opcional).

  • info – Uma string a ser associada com o erro na geração de relatórios desta transformação (opcional).

  • stageThreshold: o número de erros encontrados durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

  • totalThreshold: o número de erros encontrados até esta transformação (inclusive) em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

  • catalog_id: o ID do catálogo do Data Catalog sendo acessado (o ID da conta do Data Catalog). Quando definido como None (valor padrão), ele usa o ID do catálogo da conta de chamada.

Exemplo: usar resolveChoice para lidar com uma coluna que contém vários tipos

Este exemplo de código usa o método resolveChoice para especificar como lidar com uma coluna DynamicFrame que contém valores de vários tipos. O exemplo demonstra duas maneiras comuns de lidar com uma coluna com tipos diferentes:

  • Converter a coluna em um único tipo de dados.

  • Reter todos os tipos em colunas separadas.

Exemplo de conjunto de dados

nota

Para acessar o conjunto de dados usado neste exemplo, consulte Exemplo de código: preparo de dados usando ResolveChoice, Lambda e ApplyMapping e siga as instruções em Etapa 1: crawling de dados no bucket do Amazon S3.

O exemplo usa um DynamicFrame chamado medicare com o seguinte esquema:

root |-- drg definition: string |-- provider id: choice | |-- long | |-- string |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string

Código de exemplo

# Example: Use resolveChoice to handle # a column that contains multiple types from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input data and inspect the "provider id" column medicare = glueContext.create_dynamic_frame.from_catalog( database="payments", table_name="medicare_hospital_provider_csv" ) print("Inspect the provider id column:") medicare.toDF().select("provider id").show() # Cast provider id to type long medicare_resolved_long = medicare.resolveChoice(specs=[("provider id", "cast:long")]) print("Schema after casting provider id to type long:") medicare_resolved_long.printSchema() medicare_resolved_long.toDF().select("provider id").show() # Create separate columns # for each provider id type medicare_resolved_cols = medicare.resolveChoice(choice="make_cols") print("Schema after creating separate columns for each type:") medicare_resolved_cols.printSchema() medicare_resolved_cols.toDF().select("provider id_long", "provider id_string").show()
Inspect the 'provider id' column: +-----------+ |provider id| +-----------+ | [10001,]| | [10005,]| | [10006,]| | [10011,]| | [10016,]| | [10023,]| | [10029,]| | [10033,]| | [10039,]| | [10040,]| | [10046,]| | [10055,]| | [10056,]| | [10078,]| | [10083,]| | [10085,]| | [10090,]| | [10092,]| | [10100,]| | [10103,]| +-----------+ only showing top 20 rows Schema after casting 'provider id' to type long: root |-- drg definition: string |-- provider id: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +-----------+ |provider id| +-----------+ | 10001| | 10005| | 10006| | 10011| | 10016| | 10023| | 10029| | 10033| | 10039| | 10040| | 10046| | 10055| | 10056| | 10078| | 10083| | 10085| | 10090| | 10092| | 10100| | 10103| +-----------+ only showing top 20 rows Schema after creating separate columns for each type: root |-- drg definition: string |-- provider id_string: string |-- provider id_long: long |-- provider name: string |-- provider street address: string |-- provider city: string |-- provider state: string |-- provider zip code: long |-- hospital referral region description: string |-- total discharges: long |-- average covered charges: string |-- average total payments: string |-- average medicare payments: string +----------------+------------------+ |provider id_long|provider id_string| +----------------+------------------+ | 10001| null| | 10005| null| | 10006| null| | 10011| null| | 10016| null| | 10023| null| | 10029| null| | 10033| null| | 10039| null| | 10040| null| | 10046| null| | 10055| null| | 10056| null| | 10078| null| | 10083| null| | 10085| null| | 10090| null| | 10092| null| | 10100| null| | 10103| null| +----------------+------------------+ only showing top 20 rows

select_fields

select_fields(paths, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Retorna um novo DynamicFrame que contém os campos selecionados.

  • paths: uma lista de cadeias de caracteres. Cada cadeia de caracteres é um caminho para um nó de nível superior que você deseja selecionar.

  • transformation_ctx – Uma string única que é usada para identificar informações de estado (opcional).

  • info – Uma string a ser associada com o erro na geração de relatórios desta transformação (opcional).

  • stageThreshold: o número de erros encontrados durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

  • totalThreshold: o número de erros encontrados antes e durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

Exemplo: usar select_fields para criar um novo DynamicFrame com os campos escolhidos

O exemplo de código a seguir mostra como usar o método select_fields para criar um novo DynamicFrame com uma lista escolhida de campos de um DynamicFrame existente.

nota

Para acessar o conjunto de dados usado neste exemplo, consulte Exemplo de código: juntar e relacionar dados e siga as instruções em Etapa 1: crawling de dados no bucket do Amazon S3.

# Example: Use select_fields to select specific fields from a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create a DynamicFrame and view its schema persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) print("Schema for the persons DynamicFrame:") persons.printSchema() # Create a new DynamicFrame with chosen fields names = persons.select_fields(paths=["family_name", "given_name"]) print("Schema for the names DynamicFrame, created with select_fields:") names.printSchema() names.toDF().show()
Schema for the persons DynamicFrame: root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string Schema for the names DynamicFrame: root |-- family_name: string |-- given_name: string +-----------+----------+ |family_name|given_name| +-----------+----------+ | Collins| Michael| | Huizenga| Bill| | Clawson| Curtis| | Solomon| Gerald| | Rigell| Edward| | Crapo| Michael| | Hutto| Earl| | Ertel| Allen| | Minish| Joseph| | Andrews| Robert| | Walden| Greg| | Kazen| Abraham| | Turner| Michael| | Kolbe| James| | Lowenthal| Alan| | Capuano| Michael| | Schrader| Kurt| | Nadler| Jerrold| | Graves| Tom| | McMillan| John| +-----------+----------+ only showing top 20 rows

simplify_ddb_json

simplify_ddb_json(): DynamicFrame

Simplifica colunas aninhadas em um DynamicFrame que estão especificamente na estrutura JSON do DynamoDB e retorna um novo DynamicFrame simplificado. Se houver vários tipos ou um tipo de mapa em um tipo de lista, os elementos na lista não serão simplificados. Observe que esse é um tipo específico de transformação que se comporta de forma diferente da transformação unnest comum e requer que os dados já estejam na estrutura JSON do DynamoDB. Para mais informações, consulte JSON do DynamoDB.

Por exemplo, o esquema de uma leitura de uma exportação com a estrutura JSON do DynamoDB pode se parecer com o seguinte:

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

A transformação simplify_ddb_json() converteria isso em:

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null

Exemplo: use simplify_ddb_json para invocar uma simplificação JSON do DynamoDB

Esse exemplo de código usa o método simplify_ddb_json para utilizar o conector de exportação para DynamoDB do AWS Glue, invocar uma simplificação JSON do DynamoDB e imprimir o número de partições.

Código de exemplo

from pyspark.context import SparkContext from awsglue.context import GlueContext sc = SparkContext() glueContext = GlueContext(sc) dynamicFrame = glueContext.create_dynamic_frame.from_options( connection_type = "dynamodb", connection_options = { 'dynamodb.export': 'ddb', 'dynamodb.tableArn': '<table arn>', 'dynamodb.s3.bucket': '<bucket name>', 'dynamodb.s3.prefix': '<bucket prefix>', 'dynamodb.s3.bucketOwner': '<account_id of bucket>' } ) simplified = dynamicFrame.simplify_ddb_json() print(simplified.getNumPartitions())

spigot

spigot(path, options={})

Grava registros de exemplo em um destino específico para ajudar você a verificar as transformações realizadas pelo seu trabalho.

  • path: o caminho para o destino no qual a gravação será feita (obrigatório).

  • options: pares de chave-valor que especificam opções (opcional). A opção "topk" especifica que os primeiros registros k devem ser gravados. A opção "prob" especifica a probabilidade (como um número decimal) de escolher um determinado registro. Ele pode ser usado na seleção de registros para gravar.

  • transformation_ctx – Uma string única que é usada para identificar informações de estado (opcional).

Exemplo: usar spigot para gravar campos de exemplo de um DynamicFrame no Amazon S3

Este exemplo de código usa o método spigot para gravar registros de amostra em um bucket do Amazon S3 depois de aplicar a transformação select_fields.

Exemplo de conjunto de dados

nota

Para acessar o conjunto de dados usado neste exemplo, consulte Exemplo de código: juntar e relacionar dados e siga as instruções em Etapa 1: crawling de dados no bucket do Amazon S3.

O exemplo usa um DynamicFrame chamado persons com o seguinte esquema:

root |-- family_name: string |-- name: string |-- links: array | |-- element: struct | | |-- note: string | | |-- url: string |-- gender: string |-- image: string |-- identifiers: array | |-- element: struct | | |-- scheme: string | | |-- identifier: string |-- other_names: array | |-- element: struct | | |-- lang: string | | |-- note: string | | |-- name: string |-- sort_name: string |-- images: array | |-- element: struct | | |-- url: string |-- given_name: string |-- birth_date: string |-- id: string |-- contact_details: array | |-- element: struct | | |-- type: string | | |-- value: string |-- death_date: string

Código de exemplo

# Example: Use spigot to write sample records # to a destination during a transformation # from pyspark.context import SparkContext. # Replace DOC-EXAMPLE-BUCKET with your own location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load table data into a DynamicFrame persons = glueContext.create_dynamic_frame.from_catalog( database="legislators", table_name="persons_json" ) # Perform the select_fields on the DynamicFrame persons = persons.select_fields(paths=["family_name", "given_name", "birth_date"]) # Use spigot to write a sample of the transformed data # (the first 10 records) spigot_output = persons.spigot( path="s3://DOC-EXAMPLE-BUCKET", options={"topk": 10} )

Veja a seguir um exemplo dos dados gravados por spigot no Amazon S3. Como o código de exemplo especificou options={"topk": 10}, os dados de exemplo contêm os primeiros 10 registros.

{"family_name":"Collins","given_name":"Michael","birth_date":"1944-10-15"} {"family_name":"Huizenga","given_name":"Bill","birth_date":"1969-01-31"} {"family_name":"Clawson","given_name":"Curtis","birth_date":"1959-09-28"} {"family_name":"Solomon","given_name":"Gerald","birth_date":"1930-08-14"} {"family_name":"Rigell","given_name":"Edward","birth_date":"1960-05-28"} {"family_name":"Crapo","given_name":"Michael","birth_date":"1951-05-20"} {"family_name":"Hutto","given_name":"Earl","birth_date":"1926-05-12"} {"family_name":"Ertel","given_name":"Allen","birth_date":"1937-11-07"} {"family_name":"Minish","given_name":"Joseph","birth_date":"1916-09-01"} {"family_name":"Andrews","given_name":"Robert","birth_date":"1957-08-04"}

split_fields

split_fields(paths, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Retorna um novo DynamicFrameCollection que contém dois DynamicFrames. O primeiro DynamicFrame contém todos os nós que foram separados, e o segundo contém os nós restantes.

  • paths – Uma lista de strings, cada uma é um caminho completo para um nó que você quer separar em um novo DynamicFrame.

  • name1 – Uma string de nome para DynamicFrame a ser separado.

  • name2 – Uma string de nome para DynamicFrame que permanece após os nós especificados terem sido separados.

  • transformation_ctx – Uma string única que é usada para identificar informações de estado (opcional).

  • info – Uma string a ser associada com o erro na geração de relatórios desta transformação (opcional).

  • stageThreshold: o número de erros encontrados durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

  • totalThreshold: o número de erros encontrados antes e durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

Exemplo: usar split_fields para dividir os campos selecionados em um DynamicFrame separado

Este exemplo de código usa o método split_fields para dividir uma lista de campos especificados em uma lista separada DynamicFrame.

Exemplo de conjunto de dados

O exemplo usa um DynamicFrame chamado l_root_contact_details que é de uma coleção chamada legislators_relationalized.

l_root_contact_details tem o seguinte esquema e entradas:

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| ...

Código de exemplo

# Example: Use split_fields to split selected # fields into a separate DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Load the input DynamicFrame and inspect its schema frame_to_split = legislators_relationalized.select("l_root_contact_details") print("Inspect the input DynamicFrame schema:") frame_to_split.printSchema() # Split id and index fields into a separate DynamicFrame split_fields_collection = frame_to_split.split_fields(["id", "index"], "left", "right") # Inspect the resulting DynamicFrames print("Inspect the schemas of the DynamicFrames created with split_fields:") split_fields_collection.select("left").printSchema() split_fields_collection.select("right").printSchema()
Inspect the input DynamicFrame's schema: root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string Inspect the schemas of the DynamicFrames created with split_fields: root |-- id: long |-- index: int root |-- contact_details.val.type: string |-- contact_details.val.value: string

split_rows

split_rows(comparison_dict, name1, name2, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Separa uma ou mais linhas em um DynamicFrame em um novo DynamicFrame.

O método retorna um novo DynamicFrameCollection que contém dois DynamicFrames. O primeiro DynamicFrame contém todas as linhas que foram separadas, e o segundo contém as linhas restantes.

  • comparison_dict: um dicionário em que a chave é o caminho para uma coluna e o valor é outro dicionário para mapear comparadores a valores aos quais os valores das colunas são comparados. Por exemplo, {"age": {">": 10, "<": 20}} separa todas as linhas cujo valor na coluna de idade é maior do que 10 e menor do que 20.

  • name1 – Uma string de nome para DynamicFrame a ser separado.

  • name2 – Uma string de nome para DynamicFrame que permanece após os nós especificados terem sido separados.

  • transformation_ctx – Uma string única que é usada para identificar informações de estado (opcional).

  • info – Uma string a ser associada com o erro na geração de relatórios desta transformação (opcional).

  • stageThreshold: o número de erros encontrados durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

  • totalThreshold: o número de erros encontrados antes e durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

Exemplo: usar split_rows para dividir linhas em um DynamicFrame

Este exemplo de código usa o método split_rows para dividir linhas em um DynamicFrame com base no valor do campo id.

Exemplo de conjunto de dados

O exemplo usa um DynamicFrame chamado l_root_contact_details que é selecionado em uma coleção chamada legislators_relationalized.

l_root_contact_details tem o seguinte esquema e entradas:

root |-- id: long |-- index: int |-- contact_details.val.type: string |-- contact_details.val.value: string +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+

Código de exemplo

# Example: Use split_rows to split up # rows in a DynamicFrame based on value from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Retrieve the DynamicFrame to split frame_to_split = legislators_relationalized.select("l_root_contact_details") # Split up rows by ID split_rows_collection = frame_to_split.split_rows({"id": {">": 10}}, "high", "low") # Inspect the resulting DynamicFrames print("Inspect the DynamicFrame that contains IDs < 10") split_rows_collection.select("low").toDF().show() print("Inspect the DynamicFrame that contains IDs > 10") split_rows_collection.select("high").toDF().show()
Inspect the DynamicFrame that contains IDs < 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 1| 0| phone| 202-225-5265| | 1| 1| twitter| kathyhochul| | 2| 0| phone| 202-225-3252| | 2| 1| twitter| repjackyrosen| | 3| 0| fax| 202-225-1314| | 3| 1| phone| 202-225-3772| | 3| 2| twitter| MikeRossUpdates| | 4| 0| fax| 202-225-1314| | 4| 1| phone| 202-225-3772| | 4| 2| twitter| MikeRossUpdates| | 5| 0| fax| 202-225-1314| | 5| 1| phone| 202-225-3772| | 5| 2| twitter| MikeRossUpdates| | 6| 0| fax| 202-225-1314| | 6| 1| phone| 202-225-3772| | 6| 2| twitter| MikeRossUpdates| | 7| 0| fax| 202-225-1314| | 7| 1| phone| 202-225-3772| | 7| 2| twitter| MikeRossUpdates| | 8| 0| fax| 202-225-1314| +---+-----+------------------------+-------------------------+ only showing top 20 rows Inspect the DynamicFrame that contains IDs > 10 +---+-----+------------------------+-------------------------+ | id|index|contact_details.val.type|contact_details.val.value| +---+-----+------------------------+-------------------------+ | 11| 0| phone| 202-225-5476| | 11| 1| twitter| RepDavidYoung| | 12| 0| phone| 202-225-4035| | 12| 1| twitter| RepStephMurphy| | 13| 0| fax| 202-226-0774| | 13| 1| phone| 202-225-6335| | 14| 0| fax| 202-226-0774| | 14| 1| phone| 202-225-6335| | 15| 0| fax| 202-226-0774| | 15| 1| phone| 202-225-6335| | 16| 0| fax| 202-226-0774| | 16| 1| phone| 202-225-6335| | 17| 0| fax| 202-226-0774| | 17| 1| phone| 202-225-6335| | 18| 0| fax| 202-226-0774| | 18| 1| phone| 202-225-6335| | 19| 0| fax| 202-226-0774| | 19| 1| phone| 202-225-6335| | 20| 0| fax| 202-226-0774| | 20| 1| phone| 202-225-6335| +---+-----+------------------------+-------------------------+ only showing top 20 rows

unbox

unbox(path, format, transformation_ctx="", info="", stageThreshold=0, totalThreshold=0, **options)

Descompacta (reformata) um campo de string em um DynamicFrame e retorna um novo DynamicFrame que contém os DynamicRecords descompactados.

Um DynamicRecord representa um registro lógico em um DynamicFrame. É semelhante a uma linha em um DataFrame do Apache Spark, exceto pelo fato de que pode se autodescrever e ser usado para dados que não estejam em conformidade com um esquema fixo.

  • path – Um caminho completo para o nó de string que você quer descompactar.

  • format: uma especificação de formato (opcional). Usado para uma conexão do Amazon S3 ou do AWS Glue com suporte a vários formatos. Consulte Opções de formato de dados para entradas e saídas no AWS Glue para Spark para conhecer os formatos compatíveis.

  • transformation_ctx – Uma string única que é usada para identificar informações de estado (opcional).

  • info – Uma string a ser associada com o erro na geração de relatórios desta transformação (opcional).

  • stageThreshold: o número de erros encontrados durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

  • totalThreshold: o número de erros encontrados antes e durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

  • options: um ou mais dos seguintes itens:

    • separator: uma string que contém o caractere de separação.

    • escaper: uma string que contém o caractere de escape.

    • skipFirst: um valor booleano que indica se a primeira instância deve ser ignorada.

    • withSchema: uma string contendo uma representação JSON do esquema do nó. O formato da representação JSON de um esquema é definido pela saída de StructType.json().

    • withHeader: um valor booleano que indica se há um cabeçalho incluído.

Exemplo: usar unbox para descompactar um campo de string em um struct

Este exemplo de código usa o método unbox para desempacotar ou reformatar um campo de string em um DynamicFrame em um campo do tipo struct.

Exemplo de conjunto de dados

O exemplo usa um DynamicFrame chamado mapped_with_string com os seguintes esquema e entradas:

Observe o campo chamado AddressString. Esse é o campo que o exemplo descompacta em um struct.

root |-- Average Total Payments: string |-- AddressString: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|{"Street": "1108 ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|{"Street": "2505 ...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|{"Street": "205 M...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|{"Street": "50 ME...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| ...

Código de exemplo

# Example: Use unbox to unbox a string field # into a struct in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) unboxed = mapped_with_string.unbox("AddressString", "json") unboxed.printSchema() unboxed.toDF().show()
root |-- Average Total Payments: string |-- AddressString: struct | |-- Street: string | |-- City: string | |-- State: string | |-- Zip.Code: string | |-- Array: array | | |-- element: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ |Average Total Payments| AddressString|Average Covered Charges| DRG Definition|Average Medicare Payments|Hospital Referral Region Description| Address|Provider Id|Total Discharges| Provider Name| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ | $5777.24|[1108 ROSS CLARK ...| $32963.07|039 - EXTRACRANIA...| $4763.73| AL - Dothan|[36301, DOTHAN, [...| 10001| 91|SOUTHEAST ALABAMA...| | $5787.57|[2505 U S HIGHWAY...| $15131.85|039 - EXTRACRANIA...| $4976.71| AL - Birmingham|[35957, BOAZ, [25...| 10005| 14|MARSHALL MEDICAL ...| | $5434.95|[205 MARENGO STRE...| $37560.37|039 - EXTRACRANIA...| $4453.79| AL - Birmingham|[35631, FLORENCE,...| 10006| 24|ELIZA COFFEE MEMO...| | $5417.56|[50 MEDICAL PARK ...| $13998.28|039 - EXTRACRANIA...| $4129.16| AL - Birmingham|[35235, BIRMINGHA...| 10011| 25| ST VINCENT'S EAST| | $5658.33|[1000 FIRST STREE...| $31633.27|039 - EXTRACRANIA...| $4851.44| AL - Birmingham|[35007, ALABASTER...| 10016| 18|SHELBY BAPTIST ME...| | $6653.80|[2105 EAST SOUTH ...| $16920.79|039 - EXTRACRANIA...| $5374.14| AL - Montgomery|[36116, MONTGOMER...| 10023| 67|BAPTIST MEDICAL C...| | $5834.74|[2000 PEPPERELL P...| $11977.13|039 - EXTRACRANIA...| $4761.41| AL - Birmingham|[36801, OPELIKA, ...| 10029| 51|EAST ALABAMA MEDI...| | $8031.12|[619 SOUTH 19TH S...| $35841.09|039 - EXTRACRANIA...| $5858.50| AL - Birmingham|[35233, BIRMINGHA...| 10033| 32|UNIVERSITY OF ALA...| | $6113.38|[101 SIVLEY RD, H...| $28523.39|039 - EXTRACRANIA...| $5228.40| AL - Huntsville|[35801, HUNTSVILL...| 10039| 135| HUNTSVILLE HOSPITAL| | $5541.05|[1007 GOODYEAR AV...| $75233.38|039 - EXTRACRANIA...| $4386.94| AL - Birmingham|[35903, GADSDEN, ...| 10040| 34|GADSDEN REGIONAL ...| | $5461.57|[600 SOUTH THIRD ...| $67327.92|039 - EXTRACRANIA...| $4493.57| AL - Birmingham|[35901, GADSDEN, ...| 10046| 14|RIVERVIEW REGIONA...| | $5356.28|[4370 WEST MAIN S...| $39607.28|039 - EXTRACRANIA...| $4408.20| AL - Dothan|[36305, DOTHAN, [...| 10055| 45| FLOWERS HOSPITAL| | $5374.65|[810 ST VINCENT'S...| $22862.23|039 - EXTRACRANIA...| $4186.02| AL - Birmingham|[35205, BIRMINGHA...| 10056| 43|ST VINCENT'S BIRM...| | $5366.23|[400 EAST 10TH ST...| $31110.85|039 - EXTRACRANIA...| $4376.23| AL - Birmingham|[36207, ANNISTON,...| 10078| 21|NORTHEAST ALABAMA...| | $5282.93|[1613 NORTH MCKEN...| $25411.33|039 - EXTRACRANIA...| $4383.73| AL - Mobile|[36535, FOLEY, [1...| 10083| 15|SOUTH BALDWIN REG...| | $5676.55|[1201 7TH STREET ...| $9234.51|039 - EXTRACRANIA...| $4509.11| AL - Huntsville|[35609, DECATUR, ...| 10085| 27|DECATUR GENERAL H...| | $5930.11|[6801 AIRPORT BOU...| $15895.85|039 - EXTRACRANIA...| $3972.85| AL - Mobile|[36608, MOBILE, [...| 10090| 27| PROVIDENCE HOSPITAL| | $6192.54|[809 UNIVERSITY B...| $19721.16|039 - EXTRACRANIA...| $5179.38| AL - Tuscaloosa|[35401, TUSCALOOS...| 10092| 31|D C H REGIONAL ME...| | $4968.00|[750 MORPHY AVENU...| $10710.88|039 - EXTRACRANIA...| $3898.88| AL - Mobile|[36532, FAIRHOPE,...| 10100| 18| THOMAS HOSPITAL| | $5996.00|[701 PRINCETON AV...| $51343.75|039 - EXTRACRANIA...| $4962.45| AL - Birmingham|[35211, BIRMINGHA...| 10103| 33|BAPTIST MEDICAL C...| +----------------------+--------------------+-----------------------+--------------------+-------------------------+------------------------------------+--------------------+-----------+----------------+--------------------+ only showing top 20 rows

união

union(frame1, frame2, transformation_ctx = "", info = "", stageThreshold = 0, totalThreshold = 0)

União de dois DynamicFrames. Retorna DynamicFrame contendo todos os registros dos dois DynamicFrames de entrada. Essa transformação pode retornar resultados diferentes da união de dois DataFrames com dados equivalentes. Se você precisar do comportamento de união do Spark DataFrame, considere usar toDF.

  • frame1 - Primeiro DynamicFrame para a união.

  • frame2 - Segundo DynamicFrame para a união.

  • transformation_ctx - (opcional) Uma string exclusiva usada para identificar estatísticas/informações de estado

  • info - (opcional) Qualquer string a ser associada a erros na transformação

  • stageThreshold - (opcional) Número máximo de erros na transformação até que o processamento ocorra um erro

  • totalThreshold - (opcional) Número máximo de erros totais até que o processamento apresente erros.

unnest

unnest(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)

Desfaz o aninhamento de objetos em um DynamicFrame, transformando-os em objetos de nível superior, e retorna um novo DynamicFrame não aninhado.

  • transformation_ctx – Uma string única que é usada para identificar informações de estado (opcional).

  • info – Uma string a ser associada com o erro na geração de relatórios desta transformação (opcional).

  • stageThreshold: o número de erros encontrados durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

  • totalThreshold: o número de erros encontrados antes e durante essa transformação em que o processo deve falhar (opcional). O padrão é zero, o que indica que o processo não deve falhar.

Exemplo: usar unnest para transformar campos aninhados em campos de nível superior

Este exemplo de código usa o método unnest para nivelar todos os campos aninhados em um DynamicFrame em campos de nível superior.

Exemplo de conjunto de dados

O exemplo usa um DynamicFrame chamado mapped_medicare com o esquema a seguir. Observe que o campo Address é o único campo que contém dados aninhados.

root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address: struct | |-- Zip.Code: string | |-- City: string | |-- Array: array | | |-- element: string | |-- State: string | |-- Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

Código de exemplo

# Example: Use unnest to unnest nested # objects in a DynamicFrame from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Unnest all nested fields unnested = mapped_medicare.unnest() unnested.printSchema()
root |-- Average Total Payments: string |-- Average Covered Charges: string |-- DRG Definition: string |-- Average Medicare Payments: string |-- Hospital Referral Region Description: string |-- Address.Zip.Code: string |-- Address.City: string |-- Address.Array: array | |-- element: string |-- Address.State: string |-- Address.Street: string |-- Provider Id: string |-- Total Discharges: string |-- Provider Name: string

unnest_ddb_json

Desaninha colunas aninhadas em um DynamicFrame que estão especificamente na estrutura JSON do DynamoDB e retorna um novo DynamicFrame não aninhado. Colunas que pertençam a uma matriz de tipos de estrutura não serão desaninhadas. Observe que esse é um tipo específico de transformação de desaninhamento que se comporta diferentemente da transformação unnest comum e requer que os dados já estejam na estrutura JSON do DynamoDB. Para mais informações, consulte JSON do DynamoDB.

unnest_ddb_json(transformation_ctx="", info="", stageThreshold=0, totalThreshold=0)
  • transformation_ctx – Uma string única que é usada para identificar informações de estado (opcional).

  • info – Uma string a ser associada com o erro na geração de relatórios desta transformação (opcional).

  • stageThreshold – O número de erros encontrados durante esta transformação em que o processo deve falhar (opcional: zero por padrão, indicando que o processo não deve apresentar falha).

  • totalThreshold – O número de erros encontrados incluindo esta transformação em que o processo deve falhar (opcional: zero por padrão, indicando que o processo não deve apresentar falha).

Por exemplo, o esquema de uma leitura de uma exportação com a estrutura JSON do DynamoDB pode ter a seguinte aparência:

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

A transformação unnest_ddb_json() converteria isso em:

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

O exemplo de código a seguir mostra como usar o conector de exportação para DynamoDB do AWS Glue, invocar um desaninhamento de JSON do DynamoDB e imprimir o número de partições:

import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": "<test_source>", "dynamodb.s3.bucket": "<bucket name>", "dynamodb.s3.prefix": "<bucket prefix>", "dynamodb.s3.bucketOwner": "<account_id>", } ) unnested = dynamicFrame.unnest_ddb_json() print(unnested.getNumPartitions()) job.commit()

write

write(connection_type, connection_options, format, format_options, accumulator_size)

Obtém um DataSink(object) do tipo de conexão especificado em GlueContext classe deste DynamicFrame, e o usa para formatar e gravar o conteúdo desse DynamicFrame. Retorna o novo DynamicFrame formatado e gravado conforme especificado.

  • connection_type: o tipo de conexão a ser usado. Os valores válidos incluem s3, mysql, postgresql, redshift, sqlserver e oracle.

  • connection_options: a opção de conexão a ser usada (opcional). Para um connection_type do s3, um caminho do Amazon S3 é definido.

    connection_options = {"path": "s3://aws-glue-target/temp"}

    Para conexões JDBC, várias propriedades devem ser definidas. Observe que o nome do banco de dados deve fazer parte do URL. Ele também pode ser incluído nas opções de conexão.

    Atenção

    Não é recomendável armazenar senhas no script. Considere usar boto3 para recuperá-los do AWS Secrets Manager ou do catálogo de dados do AWS Glue.

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}
  • format: uma especificação de formato (opcional). Essa ação é usada para um Amazon Simple Storage Service (Amazon S3) ou uma conexão do AWS Glue que ofereça suporte a vários formatos. Consulte Opções de formato de dados para entradas e saídas no AWS Glue para Spark para obter os formatos compatíveis.

  • format_options: as opções de formato para o formato especificado. Consulte Opções de formato de dados para entradas e saídas no AWS Glue para Spark para obter os formatos compatíveis.

  • accumulator_size: o tamanho cumulativo a ser usado, em bytes (opcional).

 — erros —

assertErrorThreshold

assertErrorThreshold( ): uma afirmação para erros nas transformações que criaram este DynamicFrame. Retorna um Exception do DataFrame subjacente.

errorsAsDynamicFrame

errorsAsDynamicFrame( ) – Retorna um DynamicFrame com registros de erro aninhados.

Exemplo: usar errorsAsDynamicFrame para visualizar registros de erros

O código de exemplo a seguir mostra como usar o método errorsAsDynamicFrame para visualizar um registro de erro para um DynamicFrame.

Exemplo de conjunto de dados

O exemplo usa o conjunto de dados a seguir, que você pode carregar para o Amazon S3 como JSON. O segundo registro está mal formado. Dados malformados geralmente interrompem a análise de arquivos quando usamos o SparkSQL. No entanto, o DynamicFrame reconhece problemas de malformação e transforma linhas malformadas em registros de erros que você pode solucionar individualmente.

{"id": 1, "name": "george", "surname": "washington", "height": 178} {"id": 2, "name": "benjamin", "surname": "franklin", {"id": 3, "name": "alexander", "surname": "hamilton", "height": 171} {"id": 4, "name": "john", "surname": "jay", "height": 190}

Código de exemplo

# Example: Use errorsAsDynamicFrame to view error records. # Replace s3://DOC-EXAMPLE-S3-BUCKET/error_data.json with your location. from pyspark.context import SparkContext from awsglue.context import GlueContext # Create GlueContext sc = SparkContext.getOrCreate() glueContext = GlueContext(sc) # Create errors DynamicFrame, view schema errors = glueContext.create_dynamic_frame.from_options( "s3", {"paths": ["s3://DOC-EXAMPLE-S3-BUCKET/error_data.json"]}, "json" ) print("Schema of errors DynamicFrame:") errors.printSchema() # Show that errors only contains valid entries from the dataset print("errors contains only valid records from the input dataset (2 of 4 records)") errors.toDF().show() # View errors print("Errors count:", str(errors.errorsCount())) print("Errors:") errors.errorsAsDynamicFrame().toDF().show() # View error fields and error data error_record = errors.errorsAsDynamicFrame().toDF().head() error_fields = error_record["error"] print("Error fields: ") print(error_fields.asDict().keys()) print("\nError record data:") for key in error_fields.asDict().keys(): print("\n", key, ": ", str(error_fields[key]))
Schema of errors DynamicFrame: root |-- id: int |-- name: string |-- surname: string |-- height: int errors contains only valid records from the input dataset (2 of 4 records) +---+------+----------+------+ | id| name| surname|height| +---+------+----------+------+ | 1|george|washington| 178| | 4| john| jay| 190| +---+------+----------+------+ Errors count: 1 Errors: +--------------------+ | error| +--------------------+ |[[ File "/tmp/20...| +--------------------+ Error fields: dict_keys(['callsite', 'msg', 'stackTrace', 'input', 'bytesread', 'source', 'dynamicRecord']) Error record data: callsite : Row(site=' File "/tmp/2060612586885849088", line 549, in <module>\n sys.exit(main())\n File "/tmp/2060612586885849088", line 523, in main\n response = handler(content)\n File "/tmp/2060612586885849088", line 197, in execute_request\n result = node.execute()\n File "/tmp/2060612586885849088", line 103, in execute\n exec(code, global_dict)\n File "<stdin>", line 10, in <module>\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/dynamicframe.py", line 625, in from_options\n format_options, transformation_ctx, push_down_predicate, **kwargs)\n File "/opt/amazon/lib/python3.6/site-packages/awsglue/context.py", line 233, in create_dynamic_frame_from_options\n source.setFormat(format, **format_options)\n', info='') msg : error in jackson reader stackTrace : com.fasterxml.jackson.core.JsonParseException: Unexpected character ('{' (code 123)): was expecting either valid name character (for unquoted name) or double-quote (for quoted) to start field name at [Source: com.amazonaws.services.glue.readers.BufferedStream@73492578; line: 3, column: 2] at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1581) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:533) at com.fasterxml.jackson.core.base.ParserMinimalBase._reportUnexpectedChar(ParserMinimalBase.java:462) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleOddName(UTF8StreamJsonParser.java:2012) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._parseName(UTF8StreamJsonParser.java:1650) at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:740) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$hasNextGoodToken$1.apply(JacksonReader.scala:57) at scala.collection.Iterator$$anon$9.next(Iterator.scala:162) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:599) at scala.collection.Iterator$$anon$16.hasNext(Iterator.scala:598) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:120) at com.amazonaws.services.glue.readers.JacksonReader$$anonfun$1.apply(JacksonReader.scala:116) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErr(DynamicRecordBuilder.scala:209) at com.amazonaws.services.glue.DynamicRecordBuilder.handleErrorWithException(DynamicRecordBuilder.scala:202) at com.amazonaws.services.glue.readers.JacksonReader.nextFailSafe(JacksonReader.scala:116) at com.amazonaws.services.glue.readers.JacksonReader.next(JacksonReader.scala:109) at com.amazonaws.services.glue.readers.JSONReader.next(JSONReader.scala:247) at com.amazonaws.services.glue.hadoop.TapeHadoopRecordReaderSplittable.nextKeyValue(TapeHadoopRecordReaderSplittable.scala:103) at org.apache.spark.rdd.NewHadoopRDD$$anon$1.hasNext(NewHadoopRDD.scala:230) at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:462) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255) at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324) at org.apache.spark.rdd.RDD.iterator(RDD.scala:288) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750) input : bytesread : 252 source : dynamicRecord : Row(id=2, name='benjamin', surname='franklin')

errorsCount

errorsCount( ) – Retorna o número total de erros em um DynamicFrame.

stageErrorsCount

stageErrorsCount – Retorna o número de erros que ocorreram no processo de criação deste DynamicFrame.