Run an Amazon Comprehend topic modeling job on sample data using an AWS SDK - Amazon Comprehend

Run an Amazon Comprehend topic modeling job on sample data using an AWS SDK

The following code example shows how to:

  • Run an Amazon Comprehend topic modeling job on sample data.

  • Get information about the job.

  • Extract job output data from Amazon S3.

Python
SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Create a wrapper class to call Amazon Comprehend topic modeling actions.

class ComprehendTopicModeler: """Encapsulates a Comprehend topic modeler.""" def __init__(self, comprehend_client): """ :param comprehend_client: A Boto3 Comprehend client. """ self.comprehend_client = comprehend_client def start_job( self, job_name, input_bucket, input_key, input_format, output_bucket, output_key, data_access_role_arn, ): """ Starts a topic modeling job. Input is read from the specified Amazon S3 input bucket and written to the specified output bucket. Output data is stored in a tar archive compressed in gzip format. The job runs asynchronously, so you can call `describe_topics_detection_job` to get job status until it returns a status of SUCCEEDED. :param job_name: The name of the job. :param input_bucket: An Amazon S3 bucket that contains job input. :param input_key: The prefix used to find input data in the input bucket. If multiple objects have the same prefix, all of them are used. :param input_format: The format of the input data, either one document per file or one document per line. :param output_bucket: The Amazon S3 bucket where output data is written. :param output_key: The prefix prepended to the output data. :param data_access_role_arn: The Amazon Resource Name (ARN) of a role that grants Comprehend permission to read from the input bucket and write to the output bucket. :return: Information about the job, including the job ID. """ try: response = self.comprehend_client.start_topics_detection_job( JobName=job_name, DataAccessRoleArn=data_access_role_arn, InputDataConfig={ "S3Uri": f"s3://{input_bucket}/{input_key}", "InputFormat": input_format.value, }, OutputDataConfig={"S3Uri": f"s3://{output_bucket}/{output_key}"}, ) logger.info("Started topic modeling job %s.", response["JobId"]) except ClientError: logger.exception("Couldn't start topic modeling job.") raise else: return response def describe_job(self, job_id): """ Gets metadata about a topic modeling job. :param job_id: The ID of the job to look up. :return: Metadata about the job. """ try: response = self.comprehend_client.describe_topics_detection_job( JobId=job_id ) job = response["TopicsDetectionJobProperties"] logger.info("Got topic detection job %s.", job_id) except ClientError: logger.exception("Couldn't get topic detection job %s.", job_id) raise else: return job def list_jobs(self): """ Lists topic modeling jobs for the current account. :return: The list of jobs. """ try: response = self.comprehend_client.list_topics_detection_jobs() jobs = response["TopicsDetectionJobPropertiesList"] logger.info("Got %s topic detection jobs.", len(jobs)) except ClientError: logger.exception("Couldn't get topic detection jobs.") raise else: return jobs

Use the wrapper class to run a topic modeling job and get job data.

def usage_demo(): print("-" * 88) print("Welcome to the Amazon Comprehend topic modeling demo!") print("-" * 88) logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") input_prefix = "input/" output_prefix = "output/" demo_resources = ComprehendDemoResources( boto3.resource("s3"), boto3.resource("iam") ) topic_modeler = ComprehendTopicModeler(boto3.client("comprehend")) print("Setting up storage and security resources needed for the demo.") demo_resources.setup("comprehend-topic-modeler-demo") print("Copying sample data from public bucket into input bucket.") demo_resources.bucket.copy( {"Bucket": "public-sample-us-west-2", "Key": "TopicModeling/Sample.txt"}, f"{input_prefix}sample.txt", ) print("Starting topic modeling job on sample data.") job_info = topic_modeler.start_job( "demo-topic-modeling-job", demo_resources.bucket.name, input_prefix, JobInputFormat.per_line, demo_resources.bucket.name, output_prefix, demo_resources.data_access_role.arn, ) print( f"Waiting for job {job_info['JobId']} to complete. This typically takes " f"20 - 30 minutes." ) job_waiter = JobCompleteWaiter(topic_modeler.comprehend_client) job_waiter.wait(job_info["JobId"]) job = topic_modeler.describe_job(job_info["JobId"]) print(f"Job {job['JobId']} complete:") pprint(job) print( f"Getting job output data from the output Amazon S3 bucket: " f"{job['OutputDataConfig']['S3Uri']}." ) job_output = demo_resources.extract_job_output(job) lines = 10 print(f"First {lines} lines of document topics output:") pprint(job_output["doc-topics.csv"]["data"][:lines]) print(f"First {lines} lines of terms output:") pprint(job_output["topic-terms.csv"]["data"][:lines]) print("Cleaning up resources created for the demo.") demo_resources.cleanup() print("Thanks for watching!") print("-" * 88)

For a complete list of AWS SDK developer guides and code examples, see Using Amazon Comprehend with an AWS SDK. This topic also includes information about getting started and details about previous SDK versions.