入门 (Scala) - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前称为 Amazon Kinesis Data Analytics for Apache Flink。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

入门 (Scala)

注意

从 1.15 版本开始,Flink 是免费的 Scala。应用程序现在可以使用任何 Scala 版本API中的 Java。Flink 仍然在内部的一些关键组件中使用 Scala,但没有将 Scala 暴露到用户代码类加载器中。因此,您必须将 Scala 依赖项添加到您的 JAR-存档中。

有关 Flink 1.15 中 Scala 变更的更多信息,请参阅 Sc ala Free in One Fi fteen。

在本练习中,您将创建面向 Scala 的 Managed Service for Apache Flink 应用程序,并将 Kinesis 流作为源和接收器。

创建依赖资源

在本练习中,创建Managed Service for Apache Flink的应用程序之前,您需要创建以下从属资源:

  • 两个 Kinesis 流用于输入和输出。

  • 存储应用程序代码 (ka-app-code-<username>) 的 Amazon S3 存储桶

您可以使用控制台创建 Kinesis 流和 Amazon S3 存储桶。有关创建这些资源的说明,请参阅以下主题:

  • Amazon Kinesis Data Streams 开发人员指南中的创建和更新数据流。将数据流命名为 ExampleInputStreamExampleOutputStream

    创建数据流 (AWS CLI)

    • 要创建第一个直播 (ExampleInputStream),请使用以下 Amazon Kinesis create-stre AWS CLI am 命令。

      aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
    • 要创建应用程序用来写入输出的第二个流,请运行同一命令(将流名称更改为 ExampleOutputStream)。

      aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  • 《Amazon Simple Storage Service 用户指南》中的如何创建 S3 存储桶?。附加您的登录名,以便为 Amazon S3 存储桶指定全局唯一的名称,例如 ka-app-code-<username>

其他资源

在您创建应用程序时,适用于 Apache Flink 的托管服务会创建以下 Amazon CloudWatch 资源(如果这些资源尚不存在):

  • 名为 /AWS/KinesisAnalytics-java/MyApplication 的日志组

  • 名为 kinesis-analytics-log-stream 的日志流

将样本记录写入输入流

在本节中,您使用 Python 脚本将示例记录写入流,以供应用程序处理。

注意

此部分需要 AWS SDK for Python (Boto)

注意

本节中的 Python 脚本使用 AWS CLI。您必须将您的配置 AWS CLI 为使用您的账户凭证和默认区域。要配置您的 AWS CLI,请输入以下内容:

aws configure
  1. 使用以下内容创建名为 stock.py 的文件:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. 运行 stock.py 脚本:

    $ python stock.py

    在完成本教程的其余部分时,请将脚本保持运行状态。

下载并检查应用程序代码

此示例的 Python 应用程序代码可从中获得 GitHub。要下载应用程序代码,请执行以下操作:

  1. 如果尚未安装 Git 客户端,请安装它。有关更多信息,请参阅安装 Git

  2. 使用以下命令克隆远程存储库:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. 导航到 amazon-kinesis-data-analytics-java-examples/scala/GettingStarted 目录。

请注意有关应用程序代码的以下信息:

  • build.sbt 文件包含有关应用程序的配置和从属项的信息,包括Managed Service for Apache Flink的库。

  • BasicStreamingJob.scala 文件包含定义应用程序功能的主要方法。

  • 应用程序使用 Kinesis 源从源流中进行读取。以下代码段创建 Kinesis 源:

    private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }

    该应用程序还使用 Kinesis 接收器写入结果流。以下代码段创建 Kinesis 接收器:

    private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
  • 应用程序创建源连接器和接收器连接器,以使用 StreamExecutionEnvironment 对象访问外部资源。

  • 该应用程序将使用动态应用程序属性创建源和接收连接器。读取应用程序的运行时系统属性来配置连接器。有关运行时系统属性的更多信息,请参阅运行时系统属性

编译并上传应用程序代码

在本节中,您将编译应用程序代码并将其上传到您在 创建依赖资源 节中创建的 Amazon S3 存储桶。

编译应用程序代码

在本节中,您将使用SBT编译工具为应用程序构建 Scala 代码。要进行安装SBT,请参阅使用 cs 安装程序安装 sbt。您还需要安装 Java 开发套件 (JDK)。参阅完成练习的先决条件

  1. 要使用您的应用程序代码,您需要将其编译并打包成一个JAR文件。你可以用以下方法编译和打包你的代码SBT:

    sbt assembly
  2. 如果应用程序成功编译,则创建以下文件:

    target/scala-3.2.0/getting-started-scala-1.0.jar
上传 Apache Flink 流式处理 Scala 代码

在本节中,创建 Amazon S3 存储桶并上传应用程序代码。

  1. 打开 Amazon S3 控制台,网址为https://console.aws.amazon.com/s3/

  2. 选择创建存储桶

  3. 存储桶名称 字段中输入 ka-app-code-<username>。将后缀(如您的用户名)添加到存储桶名称,以使其具有全局唯一性。选择 下一步

  4. 配置选项中,让设置保持原样,然后选择下一步

  5. 设置权限中,让设置保持原样,然后选择下一步

  6. 选择 创建存储桶

  7. 选择 ka-app-code-<username> 存储桶,然后选择上传

  8. 选择文件步骤中,选择添加文件。导航到您在上一步中创建的 getting-started-scala-1.0.jar 文件。

  9. 您无需更改该对象的任何设置,因此,请选择上传

您的应用程序代码现在存储在 Amazon S3 存储桶中,应用程序可以在其中访问代码。