Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
AWS Glue mit Flink verwenden
Amazon EMR on EKS mit Apache Flink, Versionen 6.15.0 und höher, unterstützt die Verwendung des AWS Glue-Datenkatalogs als Metadatenspeicher für Streaming- und Batch-SQL-Workflows.
Sie müssen zuerst eine AWS Glue-Datenbank mit dem Namen erstellendefault
, die als Ihr Flink-SQL-Katalog dient. Dieser Flink-Katalog speichert Metadaten wie Datenbanken, Tabellen, Partitionen, Ansichten, Funktionen und andere Informationen, die für den Zugriff auf Daten in anderen externen Systemen erforderlich sind.
aws glue create-database \ --database-input "{\"Name\":\"default\"}"
Verwenden Sie eine FlinkDeployment
Spezifikation, um die AWS Glue-Unterstützung zu aktivieren. Diese Beispielspezifikation verwendet ein Python-Skript, um schnell einige Flink-SQL-Anweisungen für die Interaktion mit dem AWS Glue-Katalog auszugeben.
apiVersion: flink.apache.org/v1beta1 kind: FlinkDeployment metadata: name: python-example spec: flinkVersion: v1_17 flinkConfiguration: taskmanager.numberOfTaskSlots: "1" aws.glue.enabled: "true" executionRoleArn:
job-execution-role-arn
; emrReleaseLabel: "emr-6.15.0-flink-latest" jobManager: highAvailabilityEnabled: false replicas: 1 resource: memory: "2048m" cpu: 1 taskManager: resource: memory: "2048m" cpu: 1 job: jarURI: s3://<S3_bucket_with_your_script
/pyflink-glue-script.py
entryClass: "org.apache.flink.client.python.PythonDriver" args: ["-py", "/opt/flink/usrlib/pyflink-glue-script.py
"] parallelism: 1 upgradeMode: stateless
Das Folgende ist ein Beispiel dafür, wie Ihr PyFlink Skript aussehen könnte.
import logging import sys from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import StreamTableEnvironment def glue_demo(): env = StreamExecutionEnvironment.get_execution_environment() t_env = StreamTableEnvironment.create(stream_execution_environment=env) t_env.execute_sql(""" CREATE CATALOG glue_catalog WITH ( 'type' = 'hive', 'default-database' = 'default', 'hive-conf-dir' = '/glue/confs/hive/conf', 'hadoop-conf-dir' = '/glue/confs/hadoop/conf' ) """) t_env.execute_sql(""" USE CATALOG glue_catalog; """) t_env.execute_sql(""" DROP DATABASE IF EXISTS eks_flink_db CASCADE; """) t_env.execute_sql(""" CREATE DATABASE IF NOT EXISTS eks_flink_db WITH ('hive.database.location-uri'= 's3a://
S3-bucket-to-store-metadata
/flink/flink-glue-for-hive/warehouse/'); """) t_env.execute_sql(""" USE eks_flink_db; """) t_env.execute_sql(""" CREATE TABLE IF NOT EXISTS eksglueorders ( order_number BIGINT, price DECIMAL(32,2), buyer ROfirst_name STRING, last_name STRING
, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'datagen' ); """) t_env.execute_sql(""" CREATE TABLE IF NOT EXISTS eksdestglueorders ( order_number BIGINT, price DECIMAL(32,2), buyer ROWfirst_name STRING, last_name STRING
, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'filesystem', 'path' = 's3://S3-bucket-to-store-metadata
/flink/flink-glue-for-hive/warehouse/eksdestglueorders', 'format' = 'json' ); """) t_env.execute_sql(""" CREATE TABLE IF NOT EXISTS print_table ( order_number BIGINT, price DECIMAL(32,2), buyer ROWfirst_name STRING, last_name STRING
, order_time TIMESTAMP(3) ) WITH ( 'connector' = 'print' ); """) t_env.execute_sql(""" EXECUTE STATEMENT SET BEGIN INSERT INTO eksdestglueorders SELECT * FROM eksglueorders LIMIT 10; INSERT INTO print_table SELECT * FROM eksdestglueorders; END; """) if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") glue_demo()