Trabalhando com operações de streaming em sessões AWS Glue interativas - AWS Glue

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Trabalhando com operações de streaming em sessões AWS Glue interativas

Alterar o tipo de sessão de transmissão

Use a mágica de configuração das sessões interativas do AWS Glue, %streaming, para definir o trabalho que você está executando e inicializar uma sessão interativa de transmissão.

Amostrar o fluxo de entrada para desenvolvimento interativo

Uma ferramenta que desenvolvemos para ajudar a aprimorar a experiência AWS Glue interativa em sessões interativas é a adição de um novo método GlueContext para obter um instantâneo de um stream em uma estática DynamicFrame. GlueContextpermite que você inspecione, interaja e implemente seu fluxo de trabalho.

Com a instância de classe GlueContext, você poderá localizar o método getSampleStreamingDynamicFrame. Os argumentos necessários para esse método são:

  • dataFrame: O streaming do Spark DataFrame

  • options: veja as opções disponíveis abaixo

As opções disponíveis incluem:

  • windowSize: Isso também é chamado de Duração do Microbatch. Esse parâmetro determinará quanto tempo uma consulta de transmissão aguardará após o acionamento do lote anterior. Esse valor de parâmetro deve ser inferior a pollingTimeInMs.

  • pollingTimeInMs: O período total de tempo em que o método será executado. Ele acionará pelo menos um microlote para obter registros de amostra do fluxo de entrada.

  • recordPollingLimit: esse parâmetro ajuda a limitar o número total de registros que você pesquisará no stream.

  • (Opcional) Você também pode usar writeStreamFunction para aplicar essa função personalizada a cada função de amostragem de registro. Veja abaixo exemplos em Scala e Python.

Scala
val sampleBatchFunction = (batchDF: DataFrame, batchId: Long) => {//Optional but you can replace your own forEachBatch function here} val jsonString: String = s"""{"pollingTimeInMs": "10000", "windowSize": "5 seconds"}""" val dynFrame = glueContext.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, JsonOptions(jsonString), sampleBatchFunction) dynFrame.show()
Python
def sample_batch_function(batch_df, batch_id): //Optional but you can replace your own forEachBatch function here options = { "pollingTimeInMs": "10000", "windowSize": "5 seconds", } glue_context.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, options, sample_batch_function)
nota

Quando o DynFrame amostrado estiver vazio, isso pode acontecer por alguns motivos:

  • A fonte de transmissão está definida como “Latest” (Mais recente), e nenhum novo dado foi ingerido durante o período de amostragem.

  • O tempo de sondagem não é suficiente para processar os registros ingeridos. Os dados não serão exibidos, a menos que o lote inteiro tenha sido processado.

Executar aplicativos de transmissão em sessões interativas

Em sessões interativas do AWS Glue, você pode executar o aplicativo de transmissão do AWS Glue assim como criaria um aplicativo de transmissão no console do AWS Glue. Como as sessões interativas são baseadas em sessão, encontrar exceções no runtime não faz com que a sessão seja interrompida. Agora temos o benefício adicional de desenvolver sua função em lote iterativamente. Por exemplo:

def batch_function(data_frame, batch_id): log.info(data_frame.count()) invalid_method_call() glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})

No exemplo acima, incluímos um uso inválido de um método e, ao contrário dos trabalhos normais do AWS Glue que sairão de toda a aplicação, o contexto de codificação e as definições do usuário são totalmente preservados e a sessão permanece operacional. Não há necessidade de fazer o bootstrap de um novo cluster e executar novamente toda a transformação anterior. Isso permite que você mantenha o foco em iterar rapidamente suas implementações de função em lote para obter resultados desejáveis.

É importante observar que o Interactive Session avalia cada instrução de maneira bloqueadora para que a sessão execute apenas uma instrução por vez. Como as consultas de transmissão são contínuas e infinitas, as sessões com consultas de transmissão ativa não poderão processar instruções de acompanhamento a menos que sejam interrompidas. Você pode emitir o comando de interrupção diretamente do Jupyter Notebook e nosso kernel processará o cancelamento para você.

Considere como exemplo a seguinte sequência de instruções que estão aguardando a execução:

Statement 1: val number = df.count() #Spark Action with deterministic result Result: 5 Statement 2: streamingQuery.start().awaitTermination() #Spark Streaming Query that will be executing continously Result: Constantly updated with each microbatch Statement 3: val number2 = df.count() #This will not be executed as previous statement will be running indefinitely