CSV ファイルへのテーブルのエクスポート - Amazon Textract

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

CSV ファイルへのテーブルのエクスポート

次の Python の例は、ドキュメントのイメージからテーブルをカンマ区切り値 (CSV) ファイルにエクスポートする方法を示しています。

同期ドキュメント分析の例では、の呼び出しからテーブル情報を収集します。AnalyzeDocument。非同期ドキュメント分析の例では、StartDocumentAnalysis次に、結果を取得します。GetDocumentAnalysisなのでBlockオブジェクト。

テーブル情報は、次のように返されます。Blockへの呼び出しからのオブジェクトAnalyzeDocument。詳細については、「テーブル」を参照してください。-Blockオブジェクトは、テーブルデータを CSV ファイルにエクスポートするために使用されるマップ構造に格納されます。

Synchronous

この例では、次の関数を使用します。

  • get_table_csv_results— 呼び出しAnalyzeDocumentをクリックし、ドキュメント内で検出されたテーブルのマップを作成します。検出されたすべてのテーブルの CSV 表現を作成します。

  • generate_table_csv— 個々のテーブルの CSV ファイルを生成します。

  • get_rows_columns_map— マップから行と列を取得します。

  • get_text-セルからテキストを取得します。

テーブルを CSV ファイルにエクスポートするには
  1. 環境を設定します。詳細については、「前提条件」を参照してください。

  2. 次のコード例を、という名前のファイルに保存します。textract_python_table_parser.py

    import webbrowser, os import json import boto3 import io from io import BytesIO import sys from pprint import pprint def get_rows_columns_map(table_result, blocks_map): rows = {} for relationship in table_result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: cell = blocks_map[child_id] if cell['BlockType'] == 'CELL': row_index = cell['RowIndex'] col_index = cell['ColumnIndex'] if row_index not in rows: # create new row rows[row_index] = {} # get the text value rows[row_index][col_index] = get_text(cell, blocks_map) return rows def get_text(result, blocks_map): text = '' if 'Relationships' in result: for relationship in result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: word = blocks_map[child_id] if word['BlockType'] == 'WORD': text += word['Text'] + ' ' if word['BlockType'] == 'SELECTION_ELEMENT': if word['SelectionStatus'] =='SELECTED': text += 'X ' return text def get_table_csv_results(file_name): with open(file_name, 'rb') as file: img_test = file.read() bytes_test = bytearray(img_test) print('Image loaded', file_name) # process using image bytes # get the results client = boto3.client('textract') response = client.analyze_document(Document={'Bytes': bytes_test}, FeatureTypes=['TABLES']) # Get the text blocks blocks=response['Blocks'] pprint(blocks) blocks_map = {} table_blocks = [] for block in blocks: blocks_map[block['Id']] = block if block['BlockType'] == "TABLE": table_blocks.append(block) if len(table_blocks) <= 0: return "<b> NO Table FOUND </b>" csv = '' for index, table in enumerate(table_blocks): csv += generate_table_csv(table, blocks_map, index +1) csv += '\n\n' return csv def generate_table_csv(table_result, blocks_map, table_index): rows = get_rows_columns_map(table_result, blocks_map) table_id = 'Table_' + str(table_index) # get cells. csv = 'Table: {0}\n\n'.format(table_id) for row_index, cols in rows.items(): for col_index, text in cols.items(): csv += '{}'.format(text) + "," csv += '\n' csv += '\n\n\n' return csv def main(file_name): table_csv = get_table_csv_results(file_name) output_file = 'output.csv' # replace content with open(output_file, "wt") as fout: fout.write(table_csv) # show the results print('CSV OUTPUT FILE: ', output_file) if __name__ == "__main__": file_name = sys.argv[1] main(file_name)
  3. コマンドプロンプトで次のコマンドを入力します。置換fileに、分析するドキュメントイメージファイルの名前を指定します。

    python textract_python_table_parser.py file

この例を実行すると、CSV 出力はという名前のファイルに保存されます。output.csv

Asynchronous

この例では、2 つの異なるスクリプトを使用して make を使用します。最初のスクリプトは、ドキュメントを非同期的に分析するプロセスを開始します。StartDocumentAnalysisを取得し、Blockによって返された情報GetDocumentAnalysis。2 番目のスクリプトは、返されたBlock各ページの情報、データをテーブルとして書式設定し、テーブルを CSV ファイルに保存します。

テーブルを CSV ファイルにエクスポートするには
  1. 環境を設定します。詳細については、「前提条件」を参照してください。

  2. 「参照」に記載されている指示に従っていることを確認してください。非同期オペレーション用の Amazon Textract の設定。このページに記載されているプロセスでは、非同期ジョブの完了ステータスに関するメッセージを送受信できます。

  3. 次のコード例では、の値を置き換えます。roleArnArn をステップ 2 で作成したロールに割り当てます。の値を置き換えるbucketの部分は、お客様のドキュメントを含む S3 バケットの名前を使用します。の値を置き換えるdocumentS3 バケット内のドキュメントの名前を使用した。の値を置き換えるregion_nameバケットのリージョンの名前を指定します。

    次のコード例を、という名前のファイルに保存します。start_doc_analysis_for_table_extraction.py。

    import boto3 import time class DocumentProcessor: jobId = '' region_name = '' roleArn = '' bucket = '' document = '' sqsQueueUrl = '' snsTopicArn = '' processType = '' def __init__(self, role, bucket, document, region): self.roleArn = role self.bucket = bucket self.document = document self.region_name = region self.textract = boto3.client('textract', region_name=self.region_name) self.sqs = boto3.client('sqs') self.sns = boto3.client('sns') def ProcessDocument(self): jobFound = False response = self.textract.start_document_analysis(DocumentLocation={'S3Object': {'Bucket': self.bucket, 'Name': self.document}}, FeatureTypes=["TABLES", "FORMS"], NotificationChannel={'RoleArn': self.roleArn, 'SNSTopicArn': self.snsTopicArn}) print('Processing type: Analysis') print('Start Job Id: ' + response['JobId']) print('Done!') def CreateTopicandQueue(self): millis = str(int(round(time.time() * 1000))) # Create SNS topic snsTopicName = "AmazonTextractTopic" + millis topicResponse = self.sns.create_topic(Name=snsTopicName) self.snsTopicArn = topicResponse['TopicArn'] # create SQS queue sqsQueueName = "AmazonTextractQueue" + millis self.sqs.create_queue(QueueName=sqsQueueName) self.sqsQueueUrl = self.sqs.get_queue_url(QueueName=sqsQueueName)['QueueUrl'] attribs = self.sqs.get_queue_attributes(QueueUrl=self.sqsQueueUrl, AttributeNames=['QueueArn'])['Attributes'] sqsQueueArn = attribs['QueueArn'] # Subscribe SQS queue to SNS topic self.sns.subscribe(TopicArn=self.snsTopicArn, Protocol='sqs', Endpoint=sqsQueueArn) # Authorize SNS to write SQS queue policy = """{{ "Version":"2012-10-17", "Statement":[ {{ "Sid":"MyPolicy", "Effect":"Allow", "Principal" : {{"AWS" : "*"}}, "Action":"SQS:SendMessage", "Resource": "{}", "Condition":{{ "ArnEquals":{{ "aws:SourceArn": "{}" }} }} }} ] }}""".format(sqsQueueArn, self.snsTopicArn) response = self.sqs.set_queue_attributes( QueueUrl=self.sqsQueueUrl, Attributes={ 'Policy': policy }) def main(): roleArn = 'role-arn' bucket = 'bucket-name' document = 'document-name' region_name = 'region-name' analyzer = DocumentProcessor(roleArn, bucket, document, region_name) analyzer.CreateTopicandQueue() analyzer.ProcessDocument() if __name__ == "__main__": main()
  4. コードを実行します。コードによって JobId が出力されます。この jobId をコピーします。

  5. ジョブの処理が完了するのを待ち、ジョブが終了したら、次のコードをという名前のファイルにコピーします。get_doc_analysis_for_table_extraction.py。の値を置き換えるjobId先ほどコピーしたJob ID を使用します。の値を置き換えるregion_nameTextract ロールに関連付けられたリージョンの名前を指定します。の値を置き換えるfile_name出力の CSV を指定した名前を使用します。

    import boto3 from pprint import pprint jobId = 'job-id' region_name = 'region-name' file_name = "output-file-name.csv" textract = boto3.client('textract', region_name=region_name) # Display information about a block def DisplayBlockInfo(block): print("Block Id: " + block['Id']) print("Type: " + block['BlockType']) if 'EntityTypes' in block: print('EntityTypes: {}'.format(block['EntityTypes'])) if 'Text' in block: print("Text: " + block['Text']) if block['BlockType'] != 'PAGE': print("Confidence: " + "{:.2f}".format(block['Confidence']) + "%") def GetResults(jobId, file_name): maxResults = 1000 paginationToken = None finished = False while finished == False: response = None if paginationToken == None: response = textract.get_document_analysis(JobId=jobId, MaxResults=maxResults) else: response = textract.get_document_analysis(JobId=jobId, MaxResults=maxResults, NextToken=paginationToken) blocks = response['Blocks'] table_csv = get_table_csv_results(blocks) output_file = file_name # replace content with open(output_file, "at") as fout: fout.write(table_csv) # show the results print('Detected Document Text') print('Pages: {}'.format(response['DocumentMetadata']['Pages'])) print('OUTPUT TO CSV FILE: ', output_file) # Display block information for block in blocks: DisplayBlockInfo(block) print() print() if 'NextToken' in response: paginationToken = response['NextToken'] else: finished = True def get_rows_columns_map(table_result, blocks_map): rows = {} for relationship in table_result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: try: cell = blocks_map[child_id] if cell['BlockType'] == 'CELL': row_index = cell['RowIndex'] col_index = cell['ColumnIndex'] if row_index not in rows: # create new row rows[row_index] = {} # get the text value rows[row_index][col_index] = get_text(cell, blocks_map) except KeyError: print("Error extracting Table data - {}:".format(KeyError)) pass return rows def get_text(result, blocks_map): text = '' if 'Relationships' in result: for relationship in result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: try: word = blocks_map[child_id] if word['BlockType'] == 'WORD': text += word['Text'] + ' ' if word['BlockType'] == 'SELECTION_ELEMENT': if word['SelectionStatus'] == 'SELECTED': text += 'X ' except KeyError: print("Error extracting Table data - {}:".format(KeyError)) return text def get_table_csv_results(blocks): pprint(blocks) blocks_map = {} table_blocks = [] for block in blocks: blocks_map[block['Id']] = block if block['BlockType'] == "TABLE": table_blocks.append(block) if len(table_blocks) <= 0: return "<b> NO Table FOUND </b>" csv = '' for index, table in enumerate(table_blocks): csv += generate_table_csv(table, blocks_map, index + 1) csv += '\n\n' return csv def generate_table_csv(table_result, blocks_map, table_index): rows = get_rows_columns_map(table_result, blocks_map) table_id = 'Table_' + str(table_index) # get cells. csv = 'Table: {0}\n\n'.format(table_id) for row_index, cols in rows.items(): for col_index, text in cols.items(): csv += '{}'.format(text) + "," csv += '\n' csv += '\n\n\n' return csv response_blocks = GetResults(jobId, file_name)
  6. コードを実行します。

    結果を取得したら、関連する SNS および SQS リソースを必ず削除してください。そうしないと、それらのリソースに対して料金が発生する可能性があります。