Connectors - Amazon Managed Streaming for Apache Kafka

Connectors

A connector integrates external systems and Amazon services with Apache Kafka by continuously copying streaming data from a data source into your Apache Kafka cluster, or continuously copying data from your cluster into a data sink. A connector can also perform lightweight logic such as transformation, format conversion, or filtering data before delivering the data to a destination. Source connectors pull data from a data source and push this data into the cluster, while sink connectors pull data from the cluster and push this data into a data sink.

The following diagram shows the architecture of a connector. A worker is a Java virtual machine (JVM) process that runs the connector logic. Each worker creates a set of tasks that run in parallel threads and do the work of copying the data. Tasks don't store state, and can therefore be started, stopped, or restarted at any time in order to provide a resilient and scalable data pipeline.

Diagram showing the architecture of a connector cluster.

Connector capacity

The total capacity of a connector depends on the number of workers that the connector has, as well as on the number of MSK Connect Units (MCUs) per worker. Each MCU represents 1 vCPU of compute and 4 GiB of memory. The MCU memory pertains to the total memory of a worker instance and not the heap memory in use.

MSK Connect workers consume IP addresses in the customer-provided subnets. Each worker uses one IP address from one of the customer-provided subnets. You should ensure that you have enough available IP addresses in the subnets provided to a CreateConnector request to account for their specified capacity, especially when autoscaling connectors where the number of workers can fluctuate.

To create a connector, you must choose between one of the following two capacity modes.

  • Provisioned - Choose this mode if you know the capacity requirements for your connector. You specify two values:

    • The number of workers.

    • The number of MCUs per worker.

  • Autoscaled - Choose this mode if the capacity requirements for your connector are variable or if you don't know them in advance. When you use autoscaled mode, Amazon MSK Connect overrides your connector's tasks.max property with a value that is proportional to the number of workers running in the connector and the number of MCUs per worker.

    You specify three sets of values:

    • The minimum and maximum number of workers.

    • The scale-in and scale-out percentages for CPU utilization, which is determined by the CpuUtilization metric. When the CpuUtilization metric for the connector exceeds the scale-out percentage, MSK Connect increases the number of workers that are running in the connector. When the CpuUtilization metric goes below the scale-in percentage, MSK Connect decreases the number of workers. The number of workers always remains within the minimum and maximum numbers that you specify when you create the connector.

    • The number of MCUs per worker.

For more information about workers, see Workers. To learn about MSK Connect metrics, see Monitoring MSK Connect.

Creating a connector

Creating a connector using the AWS Management Console
  1. Open the Amazon MSK console at https://console.aws.amazon.com/msk/.

  2. In the left pane, under MSK Connect, choose Connectors.

  3. Choose Create connector.

  4. You can choose between using an existing custom plugin to create the connector, or creating a new custom plugin first. For information on custom plugins and how to create them, see Plugins. In this procedure, let's assume you have a custom plugin that you want to use. In the list of custom plugins, find the one that you want to use, and select the box to its left, then choose Next.

  5. Enter a name and, optionally, a description.

  6. Choose the cluster that you want to connect to.

  7. Specify the connector configuration. The configuration parameters that you need to specify depend on the type of connector that you want to create. However, some parameters are common to all connectors, for example, the connector.class and tasks.max parameters. The following is an example configuration for the Confluent Amazon S3 Sink Connector.

    connector.class=io.confluent.connect.s3.S3SinkConnector tasks.max=2 topics=my-example-topic s3.region=us-east-1 s3.bucket.name=my-destination-bucket flush.size=1 storage.class=io.confluent.connect.s3.storage.S3Storage format.class=io.confluent.connect.s3.format.json.JsonFormat partitioner.class=io.confluent.connect.storage.partitioner.DefaultPartitioner key.converter=org.apache.kafka.connect.storage.StringConverter value.converter=org.apache.kafka.connect.storage.StringConverter schema.compatibility=NONE
  8. Next, you configure your connector capacity. You can choose between two capacity modes: provisioned and auto scaled. For information about these two options, see Connector capacity.

  9. Choose either the default worker configuration or a custom worker configuration. For information about creating custom worker configurations, see Workers.

  10. Next, you specify the service execution role. This must be an IAM role that MSK Connect can assume, and that grants the connector all the permissions that it needs to access the necessary AWS resources. Those permissions depend on the logic of the connector. For information about how to create this role, see Service execution role.

  11. Choose Next, review the security information, then choose Next again.

  12. Specify the logging options that you want, then choose Next. For information about logging, see Logging for MSK Connect.

  13. Choose Create connector.

To use the MSK Connect API to create a connector, see CreateConnector.