Create and run a Managed Service for Apache Flink for Python application - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink was previously known as Amazon Kinesis Data Analytics for Apache Flink.

Create and run a Managed Service for Apache Flink for Python application

In this exercise, you create a Managed Service for Apache Flink application for Python application with a Kinesis stream as a source and a sink.

Create dependent resources

Before you create a Managed Service for Apache Flink for this exercise, you create the following dependent resources:

  • Two Kinesis streams for input and output.

  • An Amazon S3 bucket to store the application's code and output (ka-app-code-<username>)

Create two Kinesis streams

Before you create a Managed Service for Apache Flink application for this exercise, create two Kinesis data streams (ExampleInputStream and ExampleOutputStream). Your application uses these streams for the application source and destination streams.

You can create these streams using either the Amazon Kinesis console or the following AWS CLI command. For console instructions, see Creating and Updating Data Streams in the Amazon Kinesis Data Streams Developer Guide.

To create the data streams (AWS CLI)
  1. To create the first stream (ExampleInputStream), use the following Amazon Kinesis create-stream AWS CLI command.

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  2. To create the second stream that the application uses to write output, run the same command, changing the stream name to ExampleOutputStream.

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

Create an Amazon S3 bucket

You can create the Amazon S3 bucket using the console. For instructions for creating this resource, see the following topics:

  • How Do I Create an S3 Bucket? in the Amazon Simple Storage Service User Guide. Give the Amazon S3 bucket a globally unique name by appending your login name, such as ka-app-code-<username>.

Other resources

When you create your application, Managed Service for Apache Flink creates the following Amazon CloudWatch resources if they don't already exist:

  • A log group called /AWS/KinesisAnalytics-java/MyApplication.

  • A log stream called kinesis-analytics-log-stream.

Write sample records to the input stream

In this section, you use a Python script to write sample records to the stream for the application to process.

Note

This section requires the AWS SDK for Python (Boto).

Note

The Python script in this section uses the AWS CLI. You must configure your AWS CLI to use your account credentials and default region. To configure your AWS CLI, enter the following:

aws configure
  1. Create a file named stock.py with the following contents:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. Run the stock.py script:

    $ python stock.py

    Keep the script running while completing the rest of the tutorial.

Create and examine the Apache Flink streaming Python code

The Python application code for this example is available from GitHub. To download the application code, do the following:

  1. Install the Git client if you haven't already. For more information, see Installing Git.

  2. Clone the remote repository with the following command:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. Navigate to the amazon-kinesis-data-analytics-java-examples/python/GettingStarted directory.

The application code is located in the getting_started.py file. Note the following about the application code:

  • The application uses a Kinesis table source to read from the source stream. The following snippet calls the create_table function to create the Kinesis table source:

    table_env.execute_sql( create_table(output_table_name, output_stream, output_region)

    The create_table function uses a SQL command to create a table that is backed by the streaming source:

    def create_table(table_name, stream_name, region, stream_initpos = None): init_pos = "\n'scan.stream.initpos' = '{0}',".format(stream_initpos) if stream_initpos is not None else '' return """ CREATE TABLE {0} ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{1}', 'aws.region' = '{2}',{3} 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """.format(table_name, stream_name, region, init_pos) }
  • The application creates two tables, then writes the contents of one table to the other.

    # 2. Creates a source table from a Kinesis Data Stream table_env.execute_sql( create_table(input_table_name, input_stream, input_region) ) # 3. Creates a sink table writing to a Kinesis Data Stream table_env.execute_sql( create_table(output_table_name, output_stream, output_region, stream_initpos) ) # 4. Inserts the source table data into the sink table table_result = table_env.execute_sql("INSERT INTO {0} SELECT * FROM {1}" .format(output_table_name, input_table_name))
  • The application uses the Flink connector, from the flink- sql-connector-kinesis_2.12/1.15.2 file.

Adding third-party dependencies to Python apps

When using third-party python packages (such as boto3), you will need to add their transitive dependencies and the properties required to target these dependencies. At a high level, for PyPi dependencies, you can copy the files and folders that are located within your python environments site-packages folder to a create a directory structure like below:

PythonPackages │ README.md │ python-packages.py │ └───my_deps └───boto3 │ │ session.py │ │ utils.py │ │ ... │ └───botocore │ │ args.py │ │ auth.py │ ... └───mynonpypimodule │ │ mymodulefile1.py │ │ mymodulefile2.py ... └───lib │ │ flink-sql-connector-kinesis-4.2.0-1.18.jar │ │ ... ...

To add the the boto3 as a third-party dependency:

  1. Create a standalone Python environment (conda or similar) on your local machine with the required dependencies.

  2. Note the initial list of packages in that environment's site_packages folder.

  3. pip-install all required dependencies for your app.

  4. Note the packages that were added to the site_packages folder after step 3 above. These are the folders you need to include in your package (under the my_deps folder), organized as shown above. This will allow you to capture a diff of the packages between steps 2 and 3 to identify the right package dependencies for your application.

  5. Supply my_deps/ as an argument for the pyFiles property in the kinesis.analytics.flink.run.options property group as described below for the jarfiles property. Flink also allows you to specify Python dependencies using the add_python_file function, but it's important to keep in mind that you only need to specify one or the other – not both.

Note

You don't have to name the folder my_deps. The important part is registering the dependencies using either pyFiles or add_python_file. An example can be found at How to use boto3 within pyFlink.

Upload the Apache Flink streaming Python code

In this section, you create an Amazon S3 bucket and upload your application code.

To upload the application code using the console:
  1. Use your preferred compression application to compress the getting-started.py and Flink SQL connector files. Name the archive myapp.zip. If you include the outer folder in your archive, you must include this in the path with the code in your configuration file(s): GettingStarted/getting-started.py.

  2. Open the Amazon S3 console at https://console.aws.amazon.com/s3/.

  3. Choose Create bucket.

  4. Enter ka-app-code-<username> in the Bucket name field. Add a suffix to the bucket name, such as your user name, to make it globally unique. Choose Next.

  5. In the Configure options step, keep the settings as they are, and choose Next.

  6. In the Set permissions step, keep the settings as they are, and choose Next.

  7. Choose Create bucket.

  8. In the Amazon S3 console, choose the ka-app-code-<username> bucket, and choose Upload.

  9. In the Select files step, choose Add files. Navigate to the myapp.zip file that you created in the previous step. Choose Next.

  10. You don't need to change any of the settings for the object, so choose Upload.

To upload the application code using the AWS CLI:
Note

Do not use the compress features in Finder (macOS) or Windows Explorer (Windows) to create the myapp.zip archive. This may result in invalid application code.

  1. Use your preferred compression application to compress the streaming-file-sink.py and Flink SQL connector files.

    Note

    Do not use the compress features in Finder (macOS) or Windows Explorer (Windows) to create the myapp.zip archive. This may result in invalid application code.

  2. Use your preferred compression application to compress the getting-started.py and Flink SQL connector files. Name the archive myapp.zip. If you include the outer folder in your archive, you must include this in the path with the code in your configuration file(s): GettingStarted/getting-started.py.

  3. Run the following command:

    $ aws s3 --region aws region cp myapp.zip s3://ka-app-code-<username>

Your application code is now stored in an Amazon S3 bucket where your application can access it.

Create and run the Managed Service for Apache Flink application

Follow these steps to create, configure, update, and run the application using the console.

Create the application

  1. Open the Managed Service for Apache Flink console at https://console.aws.amazon.com/flink

  2. On the Managed Service for Apache Flink dashboard, choose Create analytics application.

  3. On the Managed Service for Apache Flink - Create application page, provide the application details as follows:

    • For Application name, enter MyApplication.

    • For Description, enter My java test app.

    • For Runtime, choose Apache Flink.

    • Leave the version as Apache Flink version 1.15.2 (Recommended version).

  4. For Access permissions, choose Create / update IAM role kinesis-analytics-MyApplication-us-west-2.

  5. Choose Create application.

Note

When you create a Managed Service for Apache Flink application using the console, you have the option of having an IAM role and policy created for your application. Your application uses this role and policy to access its dependent resources. These IAM resources are named using your application name and Region as follows:

  • Policy: kinesis-analytics-service-MyApplication-us-west-2

  • Role: kinesisanalytics-MyApplication-us-west-2

Configure the application

Use the following procedure to configure the application.

To configure the application
  1. On the MyApplication page, choose Configure.

  2. On the Configure application page, provide the Code location:

    • For Amazon S3 bucket, enter ka-app-code-<username>.

    • For Path to Amazon S3 object, enter myapp.zip.

  3. Under Access to application resources, for Access permissions, choose Create / update IAM role kinesis-analytics-MyApplication-us-west-2.

  4. Under Properties, choose Add group.

  5. Enter the following:

    Group ID Key Value
    consumer.config.0 input.stream.name ExampleInputStream
    consumer.config.0 aws.region us-west-2
    consumer.config.0 scan.stream.initpos LATEST

    Choose Save.

  6. Under Properties, choose Add group again.

  7. Enter the following:

    Group ID Key Value
    producer.config.0 output.stream.name ExampleOutputStream
    producer.config.0 aws.region us-west-2
    producer.config.0 shard.count 1
  8. Under Properties, choose Add group again. For Group ID, enter flink.sql.connector.kinesis.options. This special property group tells your application where to find its code resources. For more information, see Specifying your code files.

  9. Enter the following:

    Group ID Key Value
    kinesis.analytics.flink.run.options python getting-started.py
    kinesis.analytics.flink.run.options jarfile flink-sql-connector-kinesis-4.2.0-1.18.jar
  10. Under Monitoring, ensure that the Monitoring metrics level is set to Application.

  11. For CloudWatch logging, choose the Enable check box.

  12. Choose Update.

Note

When you choose to enable Amazon CloudWatch logging, Managed Service for Apache Flink creates a log group and log stream for you. The names of these resources are as follows:

  • Log group: /aws/kinesis-analytics/MyApplication

  • Log stream: kinesis-analytics-log-stream

Edit the IAM policy

Edit the IAM policy to add permissions to access the Amazon S3 bucket.

To edit the IAM policy to add S3 bucket permissions
  1. Open the IAM console at https://console.aws.amazon.com/iam/.

  2. Choose Policies. Choose the kinesis-analytics-service-MyApplication-us-west-2 policy that the console created for you in the previous section.

  3. On the Summary page, choose Edit policy. Choose the JSON tab.

  4. Add the highlighted section of the following policy example to the policy. Replace the sample account IDs (012345678901) with your account ID.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/myapp.zip" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Run the application

The Flink job graph can be viewed by running the application, opening the Apache Flink dashboard, and choosing the desired Flink job.

Stop the application

To stop the application, on the MyApplication page, choose Stop. Confirm the action.

Next step

Clean up AWS resources