Utilizzo dei processi Flink di Zeppelin in Amazon EMR - Amazon EMR

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizzo dei processi Flink di Zeppelin in Amazon EMR

I rilasci 6.10.0 e successivi di Amazon EMR supportano l'integrazione Apache Zeppelin con Apache Flink. Puoi inviare processi Flink in modo interattivo tramite i notebook Zeppelin. Con l'interprete Flink, puoi eseguire query Flink, definire lo streaming Flink e i processi in batch e visualizzare l'output all'interno dei notebook Zeppelin. L'interprete Flink è costruito sulla base della REST API di Flink. Ciò consente di accedere e manipolare i processi Flink dall'ambiente Zeppelin per eseguire l'elaborazione e l'analisi dei dati in tempo reale.

Ci sono quattro sottointerpreti nell'interprete Flink. Servono a scopi diversi, ma si trovano tutti nella JVM e condividono gli stessi punti di ingresso preconfigurati per Flink (ExecutionEnviroment, StreamExecutionEnvironment, BatchTableEnvironment, StreamTableEnvironment). Gli interpreti sono i seguenti:

  • %flink – Crea ExecutionEnvironment, StreamExecutionEnvironment, BatchTableEnvironment StreamTableEnvironment e fornisce un ambiente Scala

  • %flink.pyflink – Fornisce un ambiente Python

  • %flink.ssql – Fornisce un ambiente SQL in streaming

  • %flink.bsql – Fornisce un ambiente SQL in batch

Utilizza i seguenti passaggi per configurare Apache Flink su Apache Zeppelin per l'esecuzione su un cluster EMR:

  1. Crea un nuovo cluster dalla console Amazon EMR. Seleziona emr-6.10.0 o versioni successive per il rilascio di Amazon EMR. Quindi, scegli di personalizzare il tuo bundle di applicazioni con l'opzione Personalizzato. Includi almeno Flink, Hadoop e Zeppelin nel tuo bundle.

    Nella console Amazon EMR, personalizza il tuo bundle di applicazioni con l'opzione Personalizzato. Includi almeno Flink, Hadoop e Zeppelin nel tuo bundle
  2. Crea il resto del cluster con le impostazioni che preferisci.

  3. Una volta che il cluster è in esecuzione, selezionalo nella console per visualizzarne i dettagli e apri la scheda Applicazioni. Seleziona Zeppelin dalla sezione Interfacce utente dell'applicazione per aprire l'interfaccia Web di Zeppelin. Assicurati di aver configurato l'accesso all'interfaccia Web di Zeppelin con un tunnel SSH verso il nodo primario e una connessione proxy, come descritto in Prerequisiti.

    Nell'interfaccia Web di Zeppelin, puoi importare e creare nuovi notebook.
  4. Ora puoi creare una nuova nota in un notebook Zeppelin con Flink come interprete predefinito.

    Puoi creare una nuova nota in un notebook Zeppelin con Flink come interprete predefinito.
  5. Fai riferimento ai seguenti esempi di codice che dimostrano come eseguire i processi Flink da un notebook Zeppelin.

  • Esempio 1, Scala in Flink

    a) WordCount Esempio di Batch (SCALA)

    %flink val data = benv.fromElements("hello world", "hello flink", "hello hadoop") data.flatMap(line => line.split("\\s")) .map(w => (w, 1)) .groupBy(0) .sum(1) .print()

    b) WordCount Esempio di streaming (SCALA)

    %flink val data = senv.fromElements("hello world", "hello flink", "hello hadoop") data.flatMap(line => line.split("\\s")) .map(w => (w, 1)) .keyBy(0) .sum(1) .print senv.execute()
    Ad esempio, è possibile eseguire WordCount lavori in batch WordCount e in streaming da un notebook Zeppelin.
  • Esempio 2, Streaming SQL in Flink

    %flink.ssql SET 'sql-client.execution.result-mode' = 'tableau'; SET 'table.dml-sync' = 'true'; SET 'execution.runtime-mode' = 'streaming'; create table dummy_table ( id int, data string ) with ( 'connector' = 'filesystem', 'path' = 's3://s3-bucket/dummy_table', 'format' = 'csv' ); INSERT INTO dummy_table SELECT * FROM (VALUES (1, 'Hello World'), (2, 'Hi'), (2, 'Hi'), (3, 'Hello'), (3, 'World'), (4, 'ADD'), (5, 'LINE')); SELECT * FROM dummy_table;
    Questo esempio mostra come eseguire un processo Streaming SQL in Flink.
  • Esempio 3, Pyflink. Tieni presente che devi caricare il tuo file di testo di esempio denominato nel tuo word.txt bucket S3.

    %flink.pyflink import argparse import logging import sys from pyflink.common import Row from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema, DataTypes, FormatDescriptor) from pyflink.table.expressions import lit, col from pyflink.table.udf import udtf def word_count(input_path, output_path): t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) # write all the data to one file t_env.get_config().set("parallelism.default", "1") # define the source if input_path is not None: t_env.create_temporary_table( 'source', TableDescriptor.for_connector('filesystem') .schema(Schema.new_builder() .column('word', DataTypes.STRING()) .build()) .option('path', input_path) .format('csv') .build()) tab = t_env.from_path('source') else: print("Executing word_count example with default input data set.") print("Use --input to specify file input.") tab = t_env.from_elements(map(lambda i: (i,), word_count_data), DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())])) # define the sink if output_path is not None: t_env.create_temporary_table( 'sink', TableDescriptor.for_connector('filesystem') .schema(Schema.new_builder() .column('word', DataTypes.STRING()) .column('count', DataTypes.BIGINT()) .build()) .option('path', output_path) .format(FormatDescriptor.for_format('canal-json') .build()) .build()) else: print("Printing result to stdout. Use --output to specify output path.") t_env.create_temporary_table( 'sink', TableDescriptor.for_connector('print') .schema(Schema.new_builder() .column('word', DataTypes.STRING()) .column('count', DataTypes.BIGINT()) .build()) .build()) @udtf(result_types=[DataTypes.STRING()]) def split(line: Row): for s in line[0].split(): yield Row(s) # compute word count tab.flat_map(split).alias('word') \ .group_by(col('word')) \ .select(col('word'), lit(1).count) \ .execute_insert('sink') \ .wait() logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") word_count("s3://s3_bucket/word.txt", "s3://s3_bucket/demo_output.txt")
  1. Scegli FLINK JOB nell'interfaccia utente di Zeppelin per accedere e visualizzare l'interfaccia utente Web di Flink.

    Flink code snippet for word count with output showing counts for "hello", "flink", "hadoop", and "world".
  2. Scegliendo FLINK JOB si accede alla console Web di Flink in un'altra scheda del browser.

    Scegliendo FLINK JOB si apre la console Web di Flink in un'altra scheda del browser.