本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
具有組態供應商的 Debezium 來源連接器
這個範例說明如何使用我的SQL相容 Amazon Aurora
重要
開始之前
您的連接器必須能夠存取網際網路,以便它可以與您以 AWS Secrets Manager 外的服務互動 Amazon Virtual Private Cloud。本節中的步驟有助您完成以下任務,以啟用網際網路存取。
設定託管NAT閘道的公用子網路,並將流量路由到您的VPC.
建立預設路由,將您的私有子網路流量導向至NAT閘道。
如需詳細資訊,請參閱 啟用 Amazon MSK Connect 的網際網路存取。
先決條件
您需先準備好以下事項,才可啟用網際網路存取:
與叢集相關聯的 Amazon Virtual Private Cloud (VPC) 識別碼。例如,vpc-123456ab。
您IDs的私人子網路中的VPC. 例如,subnet-a1b2c3de、subnet-f4g5h6ij 等。您必須使用私有子網路來設定連接器。
啟用連接器的網際網路存取
-
在開啟 Amazon Virtual Private Cloud 主控台https://console.aws.amazon.com/vpc/
。 -
使用描述性名稱為NAT閘道建立公用子網路,並記下子網路 ID。如需詳細指示,請參閱在 VPC.
-
建立網際網路閘道,以便您VPC可以與網際網路通訊,並記下閘道 ID。將網際網路閘道連接至您的VPC. 如需說明,請參閱建立並連接網際網路閘道。
-
佈建公用NAT閘道,讓您的私有子網路中的主機可以連線到您的公用子網路。建立NAT閘道時,請選取您先前建立的公用子網路。如需指示,請參閱建立NAT閘道。
-
設定路由表。您總共須有兩個路由表才能完成此設定。您應該已經有一個主路由表,該主路由表是在同一時間自動創建的VPC. 在此步驟中,您可為公有子網路建立額外的路由表。
-
使用下列設定來修改您VPC的主路由表,讓您的私人子網路將流量路由到NAT閘道。如需說明,請參閱《Amazon Virtual Private Cloud使用者指南》中的使用路由表。
私人MSKC路由表屬性 Value Name tag (名稱標籤) 我們建議您為此路由表提供描述性名稱標籤,有助於您識別。例如,私人MSKC。 相關聯的子網路 您的私有子網路 為 Connect 啟用網際網路存取的MSK路由 -
目的地:0.0.0.0/0
-
目標:您的NAT閘道 ID。例如,nat-12a345bc6789efg1h。
適用於內部流量的本機路由 -
目的地:10.0.0.0/16。此值可能會因您VPC的CIDR區塊而有所不同。
-
目標:本機
-
-
依照建立自訂路由表中的說明,建立公有子網路的路由表。在建立該表時,請在名稱標籤欄位中輸入描述性名稱,以協助您識別與該表相關聯的子網路。例如,「公開 MSKC」。
-
使用下列設定來設定您的公用MSKC路由表。
屬性 Value Name tag (名稱標籤) 公開MSKC或您選擇的不同描述性名稱 相關聯的子網路 您的公有子網路與NAT閘道 為 Connect 啟用網際網路存取的MSK路由 -
目的地:0.0.0.0/0
-
目標:您的網際網路閘道 ID。例如,igw-1a234bc5。
適用於內部流量的本機路由 -
目的地:10.0.0.0/16。此值可能會因您VPC的CIDR區塊而有所不同。
-
目標:本機
-
-
現在您已經為 Amazon MSK Connect 啟用網際網路存取,您就可以建立連接器了。
建立 Debezium 來源連接器
建立自訂外掛程式
從 Debezi
um 網站下載最新穩定版本的「我的SQL連接器」外掛程式。請記下您下載的 Debezium 發行版本 (版本 2.x 或較舊的 1.x 系列)。稍後在此程序中,您將根據您的 Debezium 版本建立連接器。 -
將以下封存放入相同的目錄中:
-
debezium-connector-mysql
資料夾 -
jcusten-border-kafka-config-provider-aws-0.1.1
資料夾
-
-
將您在上一步中建立的目錄壓縮到ZIP檔案中,然後將ZIP檔案上傳到 S3 儲存貯體。如需說明,請參閱《Amazon S3 使用者指南》中的上傳物件。
-
複製以下內容JSON並將其粘貼到文件中。例如
debezium-source-custom-plugin.json
。Replace (取代)<example-custom-plugin-name>
使用您希望插件具有的名稱,<arn-of-your-s3-bucket>
與您上傳ZIP文件ARN的 S3 存儲桶,以及
您上傳到 S3 的ZIP對象的文件密鑰。<file-key-of-ZIP-object>
{ "name": "
<example-custom-plugin-name>
", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "<arn-of-your-s3-bucket>
", "fileKey": "<file-key-of-ZIP-object>
" } } } -
從儲存JSON檔案的資料夾執行下列 AWS CLI 命令,以建立外掛程式。
aws kafkaconnect create-custom-plugin --cli-input-json file://
<debezium-source-custom-plugin.json>
您應該會看到類似以下範例的輸出。
{ "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1", "CustomPluginState": "CREATING", "Name": "example-custom-plugin-name", "Revision": 1 }
-
執行以下命令來檢查外掛程式狀態。狀態應從
CREATING
變更為ACTIVE
。將ARN佔位符替換為您ARN在上一個命令輸出中獲得的佔位符。aws kafkaconnect describe-custom-plugin --custom-plugin-arn "
<arn-of-your-custom-plugin>
"
設定 AWS Secrets Manager 和建立資料庫認證的密碼
-
開啟 Secrets Manager 主控台,位於https://console.aws.amazon.com/secretsmanager/
。 -
建立新秘密以存放您的資料庫登入憑證。如需說明,請參閱《AWS Secrets Manager使用者指南》中的建立秘密。
-
複製你的秘密ARN。
-
將以下範例政策中的 Secrets Manager 許可新增至您的 服務執行角色。Replace (取代)
<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>
與你ARN的秘密。{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": [ "
<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>
" ] } ] }如需有關如何新增IAM權限的指示,請參閱IAM使用指南中的新增和移除IAM身分權限。
-
使用您組態供應商的資訊來建立自訂工作程序組態
-
將以下工作程序組態屬性複製到檔案中,並使用對應至您案例的值來取代預留位置字串。若要進一步了解 AWS Secrets Manager 組態提供者的組態屬性,請參閱外掛程SecretsManagerConfigProvider
式的說明文件中的。 key.converter=
<org.apache.kafka.connect.storage.StringConverter>
value.converter=<org.apache.kafka.connect.storage.StringConverter>
config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager config.providers.secretManager.param.aws.region=<us-east-1>
-
執行下列 AWS CLI 命令以建立您的自訂 Worker 組態。
取代以下的值:
-
<my-worker-config-name>
-自訂 Worker 組態的描述性名稱 -
<encoded-properties-file-content-string>
-您在上一個步驟中複製的純文字內容的 base64 編碼版本
aws kafkaconnect create-worker-configuration --name
<my-worker-config-name>
--properties-file-content<encoded-properties-file-content-string>
-
-
建立連接器
-
複製與您JSON的 Debezium 版本(2.x 或 1.x)相對應的以下內容,然後將其粘貼到新文件中。使用對應至您案例的值來取代
字串。如需有關如何設定服務執行角色的詳細資訊,請參閱 MSK Connect 的 IAM 角色和政策。<placeholder>
請注意,組態會使用類似
${secretManager:MySecret-1234:dbusername}
的變數 (而非純文字) 來指定資料庫憑證。使用您秘密的名稱來取代
,然後加入您要擷取的索引鍵名稱。您也必須取MySecret-1234
代為ARN自訂 Worker 組態。<arn-of-config-provider-worker-configuration>
-
在上一個步 AWS CLI 驟中儲存JSON檔案的資料夾中執行下列命令。
aws kafkaconnect create-connector --cli-input-json file://connector-info.json
以下是成功執行命令時所得到的輸出範例。
{ "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-Debezium-source-connector" }
-
如需具有詳細步驟的 Debezium 連接器範例,請參閱簡介 Amazon MSK Connect-使用受管連接器在 Apache Kafka 叢集之間進行串流資料