Amazon Pinpoint
Developer Guide

Step 6: Create the Lambda Function Imports Records Into Amazon Pinpoint

The final Lambda function that you create for this tutorial creates an import job in Amazon Pinpoint. The import job imports the files in the processed folder of your Amazon S3 bucket. Although the processed folder might contain several files, the import job creates a single segment that contains all of the endpoints within those files.

This function uses a test to ensure that all of the files that were generated by the previous Lambda functions are present before starting the import job. It also checks to see if you've created any import jobs that refer to the same folder, in order to prevent the creation of duplicate segments.

Step 6.1: Create the Function

The process of creating the final Lambda function is much simpler than for the previous two functions, because you don't need to import any external libraries.

To create the function

  1. Open the AWS Lambda console at https://console.aws.amazon.com/lambda/.

  2. Choose Create function.

  3. Choose Author from scratch. Under Basic information, do the following:

    • For Function name, enter CustomerImport_CreateJob.

    • For Runtime, choose Python 3.7.

    • For Execution role, choose Use an existing role.

    • For Existing role, choose ImporterPinpointRole.

    • Choose Create function.

  4. Erase the example in the code editor, and then paste the following code into the editor:

    import os import time import boto3 from botocore.exceptions import ClientError AWS_REGION = os.environ['region'] projectId = os.environ['projectId'] importRoleArn = os.environ['importRoleArn'] def lambda_handler(event, context): print("Received event: " + str(event)) for record in event['Records']: # Assign some variables to make it easier to work with the data in the # event recordi bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] folder = os.path.split(key)[0] folder_path = os.path.join(bucket, folder) full_path = os.path.join(bucket, key) s3_url = "s3://" + folder_path # Check to see if all file parts have been processed. if all_files_processed(bucket, folder, full_path): # If you haven't recently run an import job that uses a file stored in # the specified S3 bucket, then create a new import job. This prevents # the creation of duplicate segments. if not (check_import_jobs(bucket, folder, s3_url)): create_import_job(s3_url) else: print("Import job found with URL s3://" + os.path.join(bucket,folder) + ". Aborting.") else: print("Parts haven't finished processing yet.") # Determine if all of the file parts have been processed. def all_files_processed(bucket, folder, full_path): # Use the "__ofN" part of the file name to determine how many files there # should be. number_of_parts = int((full_path.split("__of")[1]).split("_processed")[0]) # Figure out how many keys contain the prefix for the current batch of # folders (basically, how many files are in the appropriate "folder"). client = boto3.client('s3') objs = client.list_objects_v2(Bucket=bucket,Prefix=folder) file_count = objs['KeyCount'] ready_for_import = False if file_count == number_of_parts: ready_for_import = True return ready_for_import # Check Amazon Pinpoint to see if any import jobs have been created by using # the same S3 folder. def check_import_jobs(bucket, folder, s3_url): url_list = [] # Retrieve a list of import jobs for the current project ID. client = boto3.client('pinpoint') try: client_response = client.get_import_jobs( ApplicationId=projectId ) except ClientError as e: print(e.response['Error']['Message']) else: segment_response = client_response['ImportJobsResponse']['Item'] # Parse responses. Add all S3Url values to a list. for item in segment_response: s3_url_existing = item['Definition']['S3Url'] url_list.append(s3_url_existing) # Search for the current S3 URL in the list. if s3_url in url_list: found = True else: found = False return found # Create the import job in Amazon Pinpoint. def create_import_job(s3_url): client = boto3.client('pinpoint') segment_name = s3_url.split('/')[4] try: response = client.create_import_job( ApplicationId=projectId, ImportJobRequest={ 'DefineSegment': True, 'Format': 'CSV', 'RegisterEndpoints': True, 'RoleArn': importRoleArn, 'S3Url': s3_url, 'SegmentName': segment_name } ) except ClientError as e: print(e.response['Error']['Message']) else: print("Import job " + response['ImportJobResponse']['Id'] + " " + response['ImportJobResponse']['JobStatus'] + ".") print("Segment ID: " + response['ImportJobResponse']['Definition']['SegmentId']) print("Application ID: " + projectId)
  5. Under Environment variables, do the following:

    • In the first row, create a variable with a key of projectId. Next, set the value to the unique ID of the project that you specified in the IAM policy in Step 2.2.

    • In the second row, create a variable with a key of region. Next, set the value to the Region that that you specified in the IAM policy in Step 2.2.

    • In the third row, create a variable with a key of importRoleArn. Next, set the value to the Amazon Resource Name of the IAM role that you created in Step 2.2.

  6. Under Basic settings, set the Timeout value to 7 seconds.

    Note

    You might be able to use the default timeout value of 3 seconds. However, it might require slightly more time for larger jobs to be created.

  7. At the top of the page, choose Save.

Step 6.2: Test the Function

After you create the function, you should test it to ensure that it's set up correctly.

To test the Lambda function

  1. Choose Test. On the Configure test event window, for Event name, enter TestEvent. Then, in the editor, paste the following code:

    { "Records": [ { "s3": { "bucket": { "name": "bucket-name", "arn": "arn:aws:s3:::bucket-name" }, "object": { "key": "processed/testfile/testfile__part1_processed.csv" } } } ] }

    In the preceding example, replace bucket-name with the name of the Amazon S3 bucket that you created in Step 1. When you finish, choose Create.

  2. Choose Test again. The function will execute with the test event that you provided.

  3. Open the Amazon S3 console at https://console.aws.amazon.com/s3/.

    Choose the bucket that you created in Step 1: Create an Amazon S3 Bucket.

    Open the folders in the bucket and take note of the contents of each one. If all of the following statements are true, then the Lambda function worked as expected:

    • The to_process folder doesn't contain any files.

    • The processed folder contains a subfolder named testfile. Within the testfile folder, there is a file named testfile__part1_processed.csv.

    Don't delete any of the newly generated files. The Lambda function that you create in the next step uses the files in the to_process folder.

Next: Set Up Amazon S3 Events