Usando AWS Glue com Flink - Amazon EMR

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Usando AWS Glue com Flink

O Amazon EMR no EKS com Apache Flink, versões 6.15.0 e superiores, oferece suporte ao uso do AWS Glue Data Catalog como um armazenamento de metadados para fluxos de trabalho SQL em lote e streaming.

Primeiro, você deve criar um banco de dados AWS Glue chamado default que sirva como seu catálogo SQL do Flink. Esse catálogo do Flink armazena metadados como bancos de dados, tabelas, partições, visualizações, funções e outras informações necessárias para acessar dados em outros sistemas externos.

aws glue create-database \ --database-input "{\"Name\":\"default\"}"

Para ativar o suporte ao AWS Glue, use uma FlinkDeployment especificação. Esse exemplo de especificação usa um script Python para emitir rapidamente algumas instruções SQL do Flink para interagir com o AWS catálogo Glue.

apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-example spec: flinkVersion: v1_17 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" aws.glue.enabled: "true" executionRoleArn: job-execution-role-arn; emrReleaseLabel: "emr-6.15.0-flink-latest" jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: s3://<S3_bucket_with_your_script/pyflink-glue-script.py entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-py", "/opt/flink/usrlib/pyflink-glue-script.py"] parallelism: 1 upgradeMode: stateless

Veja a seguir um exemplo da aparência do seu PyFlink script.

import logging import sys from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment def glue_demo(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env) t_env.execute_sql(""" CREATE CATALOG glue_catalog WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/glue/confs/hive/conf', 'hadoop-conf-dir' = '/glue/confs/hadoop/conf' ) """) t_env.execute_sql(""" USE CATALOG glue_catalog; """) t_env.execute_sql(""" DROP DATABASE IF EXISTS eks_flink_db CASCADE; """) t_env.execute_sql(""" CREATE DATABASE IF NOT EXISTS eks_flink_db WITH ('hive.database.location-uri'= 's3a://S3-bucket-to-store-metadata/flink/flink-glue-for-hive/warehouse/'); """) t_env.execute_sql(""" USE eks_flink_db; """) t_env.execute_sql(""" CREATE TABLE IF NOT EXISTS eksglueorders ( order_number BIGINT, price DECIMAL(32,2), buyer RO first_name STRING, last_name STRING, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'datagen' ); """) t_env.execute_sql(""" CREATE TABLE IF NOT EXISTS eksdestglueorders ( order_number BIGINT, price DECIMAL(32,2), buyer ROW first_name STRING, last_name STRING, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'filesystem', 'path' = 's3://S3-bucket-to-store-metadata/flink/flink-glue-for-hive/warehouse/eksdestglueorders', 'format' = 'json' ); """) t_env.execute_sql(""" CREATE TABLE IF NOT EXISTS print_table ( order_number BIGINT, price DECIMAL(32,2), buyer ROW first_name STRING, last_name STRING, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'print' ); """) t_env.execute_sql(""" EXECUTE STATEMENT SET BEGIN INSERT INTO eksdestglueorders SELECT * FROM eksglueorders LIMIT 10; INSERT INTO print_table SELECT * FROM eksdestglueorders; END; """) if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") glue_demo()