使用雪花雪花管、Amazon S3、Amazon SNS 和 Amazon Kinesis 資料火軟管,將資料流擷取到雪花資料庫中 - AWS Prescriptive Guidance

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用雪花雪花管、Amazon S3、Amazon SNS 和 Amazon Kinesis 資料火軟管,將資料流擷取到雪花資料庫中

由比卡什錢卓朗特 (AWS) 創建

建立者:AWS

環境:PoC 或試驗

技術:儲存與備份

Summary

此模式描述了如何使用 Amazon Web Services (AWS) 雲端上的服務來處理連續的資料流,並將其載入 Snowflake 資料庫。該模式使用 Amazon Kinesis Data Firehose 將資料交付到 Amazon Simple Storage Service (Amazon S3)、Amazon Simple Notification Service (Amazon SNS) 在收到新資料時傳送通知,Snowflake Simple Storage Service (S3)。

透過遵循此模式,您可以在幾秒鐘內持續產生可供分析的資料,避免多個手動 COPY 指令,並在負載時完全支援半結構化資料。

先決條件和限制

先決條件

  • 作用中的 AWS 帳戶

  • 持續將資料傳送至 Kinesis Data Firehose 交付串流的資料來源。

  • 正在接收來自 Kinesis Data Firehose 交付串流的資料 S3 儲存貯體。

  • 作用中的 Snowflake 帳戶

限制

  • 雪花雪花管不會直接連接至 Kinesis Data Firehose。

Architecture

技術堆疊

  • Amazon Kinesis Data Firehose

  • Amazon SNS

  • Amazon S3

  • Snowflake Snowflake

  • Snowflake 資料庫

Tools

  • Kinesis Data Firehose— Amazon Kinesis Data Firehose 是一種全受管服務,可將即時串流資料交付到目的地,例如 Amazon S3、Amazon Redshift、Amazon OpenSearch Service、Splunk,以及受支援的第三方服務供應商擁有的任何自訂 HTTP 端點或 HTTP 端點。

  • Amazon S3— Amazon Simple Storage Service (Amazon S3) 是網際網路儲存服務。

  • Amazon SNS— Amazon Simple Notification Service (Amazon SNS) 協調和管理消息傳遞或發送到訂閱端點或客戶端。

  • Snowflake— Snowflake 是一種分析資料倉儲,以軟體即服務 (SaaS) 形式提供。

  • Snowflake Snowflake— Snowpipe 只要在雪花階段中可用,就會從檔案載入資料。

Epics

任務描述所需技能
在雪花中創建 CSV 文件。

登入雪花並執行「建立檔案格式」命令,以指定的欄位分隔符號建立 CSV 檔案。如需有關這個和其他 Snowflake 命令的詳細資訊,請參閱 < 其他資訊 > 一節。

開發人員
建立外部雪花階段。

執行「建立階段」命令,建立參照您先前建立的 CSV 檔案的外部雪花階段。重要:您需要 S3 儲存貯體的 URL、AWS 存取金鑰和 AWS 秘密存取金鑰。執行「顯示階段」命令,以確認已建立雪花階段。

開發人員
建立 Snowflake 目標資料表。

執行「建立表格」命令來建立雪花表格。

開發人員
建立管道員。

運行「創建管」命令; 確保命令中的「auto_ingest= 真」。執行「顯示管」指令,以確認管已建立。複製並儲存「通知」欄位值。這個值將用於設定 Amazon S3 事件通知。

開發人員
任務描述所需技能
為 S3 儲存貯體建立 30 天生命週期原則。

登入 AWS 管理員主控台,然後開啟 Amazon S3 主控台。選擇包含來自 Kinesis 資料防火軟管的資料的 S3 儲存貯體。然後在 S3 存儲桶中選擇「管理」選項卡並選擇「添加生命週期規則」。在「生命週期規則」對話方塊中輸入規則的名稱,並為儲存貯體設定 30 天的生命週期規則。如需這個和其他故事的協助,請參閱 < 相關資源 > 一節。

系統管理員員
建立 S3 儲存貯體的 IAM 政策。

開啟 AWS Identity and Access Management 員 (IAM) 主控台,然後選擇「政策」。選擇「建立原則」,然後選擇「JSON」標籤。從「其他資訊」區段複製原則並貼到 JSON 欄位。此原則會授與「PutObject」和「DeleteObject」權限,以及「物件」、「GetObject 件」、「物件版本」和「ListBucket」權限。選擇「檢閱政策」,輸入政策名稱,然後選擇「建立政策」。

系統管理員員
將原則指派給 IAM 角色。

開啟 IAM 主控台選擇「角色」,然後選擇「建立角色」。選擇「另一個 AWS 帳戶」做為信任的實體。輸入您的 AWS 帳戶 ID,然後選擇「需要外部 ID」。輸入預留位置 ID,以便稍後變更。選擇「下一步」並指派您先前建立的 IAM 政策。然後建立 IAM 角色。

系統管理員員
複製 IAM 角色的 Amazon Resource Name (ARN)。

開啟 IAM 主控台,然後選擇「角色」。選擇您先前建立的 IAM 角色,然後複製並儲存「角色 ARN」。

系統管理員員
任務描述所需技能
在雪花中建立儲存整合。

登入雪花並執行「建立儲存整合」命令。這會修改受信任的關係、授予對雪花的存取權,以及為雪花階段提供外部 ID。

系統管理員員
擷取雪花帳戶的 IAM 角色。

執行「描述整合」命令以擷取 IAM 角色的 ARN。重要事 <integration_ name> 項:是您之前建立的 Snowflake 儲存整合的名稱。

系統管理員員
記錄兩個欄值。

複製並儲存「儲存空間」和「儲存空間」資料行的值。

系統管理員員
任務描述所需技能
修改 IAM 角色政策。

開啟 IAM 主控台,然後選擇「角色」。選擇您先前建立的 IAM 角色,然後選擇「信任關係」標籤。選擇「編輯信任關係」。將「雪花」取代為您先前複製的「儲存空間」值。將「雪花」取代為您先前複製的儲存空間值。然後選擇「更新信任政策」。

系統管理員員
任務描述所需技能
開啟 S3 儲存貯體的事件通知。

開啟 Amazon S3 主控台,然後選擇儲存貯體。選擇「屬性」,然後在「進階設定」下選擇「事件」。選擇「新增通知」,然後輸入此事件的名稱。如果您未輸入名稱,則會使用全域唯一識別碼 (GUID)。

系統管理員員
設定 S3 儲存貯體的 Amazon SNS 通知。

在「事件」下,選擇「物件建立 (全部)」,然後在「傳送至」下拉式清單中選擇「SQS 佇列」。在「SNS」清單中選擇「新增 SQS 佇列 ARN」,然後貼上您先前複製的「notification_channel」值。然後選擇「儲存」。

系統管理員員
訂閱 Snowflake SQS 佇列至 SNS 主題。

訂閱 Snowflake SQS 佇列至您建立的 SNS 主題。如需此步驟的說明,請參閱 < 相關資源 > 一節。

系統管理員員
任務描述所需技能
檢查並測試雪球。

登入雪花並開啟雪花階段。將文件放入 S3 存儲桶中,並檢查雪花表是否加載它們。當新物件出現在 S3 儲存貯體中時,Amazon S3 會將 SNS 通知傳送到 Snowpipe。

系統管理員員

相關資源

其他資訊

建立檔案格式:

CREATE FILE FORMAT <name> TYPE = 'CSV' FIELD_DELIMITER = '|' SKIP_HEADER = 1;

建立外部階段:

externalStageParams (for Amazon S3) ::= URL = 's3://[//]' [ { STORAGE_INTEGRATION = } | { CREDENTIALS = ( { { AWS_KEY_ID = `` AWS_SECRET_KEY = `` [ AWS_TOKEN = `` ] } | AWS_ROLE = `` } ) ) }` ] [ ENCRYPTION = ( [ TYPE = 'AWS_CSE' ] [ MASTER_KEY = '' ] | [ TYPE = 'AWS_SSE_S3' ] | [ TYPE = 'AWS_SSE_KMS' [ KMS_KEY_ID = '' ] | [ TYPE = NONE ] )

建立資料表:

CREATE [ OR REPLACE ] [ { [ LOCAL | GLOBAL ] TEMP[ORARY] | VOLATILE } | TRANSIENT ] TABLE [ IF NOT EXISTS ] <table_name> ( <col_name> <col_type> [ { DEFAULT <expr> | { AUTOINCREMENT | IDENTITY } [ ( <start_num> , <step_num> ) | START <num> INCREMENT <num> ] } ] /* AUTOINCREMENT / IDENTITY supported only for numeric data types (NUMBER, INT, etc.) */ [ inlineConstraint ] [ , <col_name> <col_type> ... ] [ , outoflineConstraint ] [ , ... ] ) [ CLUSTER BY ( <expr> [ , <expr> , ... ] ) ] [ STAGE_FILE_FORMAT = ( { FORMAT_NAME = '<file_format_name>' | TYPE = { CSV | JSON | AVRO | ORC | PARQUET | XML } [ formatTypeOptions ] } ) ] [ STAGE_COPY_OPTIONS = ( copyOptions ) ] [ DATA_RETENTION_TIME_IN_DAYS = <num> ] [ COPY GRANTS ] [ COMMENT = '<string_literal>' ]

顯示階段:

SHOW STAGES;

建立管:

CREATE [ OR REPLACE ] PIPE [ IF NOT EXISTS ] [ AUTO_INGEST = [ TRUE | FALSE ] ] [ AWS_SNS_TOPIC = ] [ INTEGRATION = '' ] [ COMMENT = '' ] AS

展示管:

SHOW PIPES [ LIKE '<pattern>' ] [ IN { ACCOUNT | [ DATABASE ] <db_name> | [ SCHEMA ] <schema_name> } ]

建立儲存整合:

CREATE STORAGE INTEGRATION <integration_name>   TYPE = EXTERNAL_STAGE   STORAGE_PROVIDER = S3   ENABLED = TRUE   STORAGE_AWS_ROLE_ARN = '<iam_role>'   STORAGE_ALLOWED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/')   [ STORAGE_BLOCKED_LOCATIONS = ('s3://<bucket>/<path>/', 's3://<bucket>/<path>/') ]

範例:

create storage integration s3_int   type = external_stage   storage_provider = s3   enabled = true   storage_aws_role_arn = 'arn:aws:iam::001234567890:role/myrole'   storage_allowed_locations = ('s3://mybucket1/mypath1/', 's3://mybucket2/mypath2/')   storage_blocked_locations = ('s3://mybucket1/mypath1/sensitivedata/', 's3://mybucket2/mypath2/sensitivedata/');

如需有關此步驟的詳細資訊,請參閱配置雪花儲存整合以存取 Amazon S3從雪花文檔。

說明整合:

DESC INTEGRATION <integration_name>;

S3 儲存貯體政策:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:PutObject", "s3:GetObject", "s3:GetObjectVersion", "s3:DeleteObject", "s3:DeleteObjectVersion" ], "Resource": "arn:aws:s3::://*" }, { "Effect": "Allow", "Action": "s3:ListBucket", "Resource": "arn:aws:s3:::", "Condition": { "StringLike": { "s3:prefix": [ "/*" ] } } } ] }