Exportation de tables dans un fichier CSV - Amazon Textract

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Exportation de tables dans un fichier CSV

Ces exemples Python montrent comment exporter des tables à partir d'une image d'un document dans un fichier CSV (valeurs séparées par une virgule).

L'exemple d'analyse synchrone de documents recueille des informations de table à partir d'un appel àAnalyzeDocument. L'exemple d'analyse de documents asynchrone fait un appel àStartDocumentAnalysispuis récupère les résultats deGetDocumentAnalysiscommeBlockobjets.

Les informations de table sont renvoyées sous formeBlockobjets d'un appel àAnalyzeDocument. Pour plus d'informations, consultez Tables. LeBlocksont stockés dans une structure cartographique utilisée pour exporter les données de la table dans un fichier CSV.

Synchronous

Dans cet exemple, vous allez utiliser les fonctions suivantes :

  • get_table_csv_results— AppelsAnalyzeDocument, et crée une carte des tables détectées dans le document. Crée une représentation CSV de toutes les tables détectées.

  • generate_table_csv: génère le fichier CSV pour une table individuelle.

  • get_rows_columns_map: récupère les lignes et les colonnes de la carte.

  • get_text: récupère le texte d'une cellule.

Pour exporter des tables dans un fichier CSV
  1. Configurez votre environnement. Pour plus d'informations, consultez Prérequis.

  2. Enregistrez l'exemple de code suivant dans un fichier nommétextract_python_table_parser.py.

    import webbrowser, os import json import boto3 import io from io import BytesIO import sys from pprint import pprint def get_rows_columns_map(table_result, blocks_map): rows = {} for relationship in table_result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: cell = blocks_map[child_id] if cell['BlockType'] == 'CELL': row_index = cell['RowIndex'] col_index = cell['ColumnIndex'] if row_index not in rows: # create new row rows[row_index] = {} # get the text value rows[row_index][col_index] = get_text(cell, blocks_map) return rows def get_text(result, blocks_map): text = '' if 'Relationships' in result: for relationship in result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: word = blocks_map[child_id] if word['BlockType'] == 'WORD': text += word['Text'] + ' ' if word['BlockType'] == 'SELECTION_ELEMENT': if word['SelectionStatus'] =='SELECTED': text += 'X ' return text def get_table_csv_results(file_name): with open(file_name, 'rb') as file: img_test = file.read() bytes_test = bytearray(img_test) print('Image loaded', file_name) # process using image bytes # get the results client = boto3.client('textract') response = client.analyze_document(Document={'Bytes': bytes_test}, FeatureTypes=['TABLES']) # Get the text blocks blocks=response['Blocks'] pprint(blocks) blocks_map = {} table_blocks = [] for block in blocks: blocks_map[block['Id']] = block if block['BlockType'] == "TABLE": table_blocks.append(block) if len(table_blocks) <= 0: return "<b> NO Table FOUND </b>" csv = '' for index, table in enumerate(table_blocks): csv += generate_table_csv(table, blocks_map, index +1) csv += '\n\n' return csv def generate_table_csv(table_result, blocks_map, table_index): rows = get_rows_columns_map(table_result, blocks_map) table_id = 'Table_' + str(table_index) # get cells. csv = 'Table: {0}\n\n'.format(table_id) for row_index, cols in rows.items(): for col_index, text in cols.items(): csv += '{}'.format(text) + "," csv += '\n' csv += '\n\n\n' return csv def main(file_name): table_csv = get_table_csv_results(file_name) output_file = 'output.csv' # replace content with open(output_file, "wt") as fout: fout.write(table_csv) # show the results print('CSV OUTPUT FILE: ', output_file) if __name__ == "__main__": file_name = sys.argv[1] main(file_name)
  3. A partir d'une invite de commande, entrez la commande suivante. Remplacezfileavec le nom du fichier image que vous souhaitez analyser.

    python textract_python_table_parser.py file

Lorsque vous exécutez cet exemple, la sortie CSV est enregistrée dans un fichier nomméoutput.csv.

Asynchronous

Dans cet exemple, vous allez utiliser deux scripts différents. Le premier script démarre le processus d'analyse asynchrone des documents avecStartDocumentAnalysiset obtient leBlockinformations renvoyées parGetDocumentAnalysis. Le second script prend le texte renvoyéBlockinformations pour chaque page, formate les données sous forme de tableau et enregistre les tables dans un fichier CSV.

Pour exporter des tables dans un fichier CSV
  1. Configurez votre environnement. Pour plus d'informations, consultez Prérequis.

  2. Assurez-vous d'avoir suivi les instructions données à l'adresseConfiguration d'Amazon Textract pour les opérations asynchrones. Le processus documenté sur cette page vous permet d'envoyer et de recevoir des messages sur l'état d'achèvement des tâches asynchrones.

  3. Dans l'exemple de code suivant, remplacez la valeur deroleArnavec l'Arn attribué au rôle que vous avez créé à l'étape 2. Remplacez la valeur debucketavec le nom du compartiment S3 contenant votre document. Remplacez la valeur dedocumentpar le nom du document dans votre compartiment S3. Remplacez la valeur deregion_namepar le nom de la région de votre compartiment.

    Enregistrez l'exemple de code suivant dans un fichier nomméstart_doc_analysis_for_table_extraction.py..

    import boto3 import time class DocumentProcessor: jobId = '' region_name = '' roleArn = '' bucket = '' document = '' sqsQueueUrl = '' snsTopicArn = '' processType = '' def __init__(self, role, bucket, document, region): self.roleArn = role self.bucket = bucket self.document = document self.region_name = region self.textract = boto3.client('textract', region_name=self.region_name) self.sqs = boto3.client('sqs') self.sns = boto3.client('sns') def ProcessDocument(self): jobFound = False response = self.textract.start_document_analysis(DocumentLocation={'S3Object': {'Bucket': self.bucket, 'Name': self.document}}, FeatureTypes=["TABLES", "FORMS"], NotificationChannel={'RoleArn': self.roleArn, 'SNSTopicArn': self.snsTopicArn}) print('Processing type: Analysis') print('Start Job Id: ' + response['JobId']) print('Done!') def CreateTopicandQueue(self): millis = str(int(round(time.time() * 1000))) # Create SNS topic snsTopicName = "AmazonTextractTopic" + millis topicResponse = self.sns.create_topic(Name=snsTopicName) self.snsTopicArn = topicResponse['TopicArn'] # create SQS queue sqsQueueName = "AmazonTextractQueue" + millis self.sqs.create_queue(QueueName=sqsQueueName) self.sqsQueueUrl = self.sqs.get_queue_url(QueueName=sqsQueueName)['QueueUrl'] attribs = self.sqs.get_queue_attributes(QueueUrl=self.sqsQueueUrl, AttributeNames=['QueueArn'])['Attributes'] sqsQueueArn = attribs['QueueArn'] # Subscribe SQS queue to SNS topic self.sns.subscribe(TopicArn=self.snsTopicArn, Protocol='sqs', Endpoint=sqsQueueArn) # Authorize SNS to write SQS queue policy = """{{ "Version":"2012-10-17", "Statement":[ {{ "Sid":"MyPolicy", "Effect":"Allow", "Principal" : {{"AWS" : "*"}}, "Action":"SQS:SendMessage", "Resource": "{}", "Condition":{{ "ArnEquals":{{ "aws:SourceArn": "{}" }} }} }} ] }}""".format(sqsQueueArn, self.snsTopicArn) response = self.sqs.set_queue_attributes( QueueUrl=self.sqsQueueUrl, Attributes={ 'Policy': policy }) def main(): roleArn = 'role-arn' bucket = 'bucket-name' document = 'document-name' region_name = 'region-name' analyzer = DocumentProcessor(roleArn, bucket, document, region_name) analyzer.CreateTopicandQueue() analyzer.ProcessDocument() if __name__ == "__main__": main()
  4. Exécutez le code. Le code va imprimer un JobId. Copiez ce JobId vers le bas.

  5. Attendez la fin du traitement de votre tâche et, une fois qu'il est terminé, copiez le code suivant dans un fichier nomméget_doc_analysis_for_table_extraction.py. Remplacez la valeur dejobIdavec l'ID de Job que vous avez copié précédemment. Remplacez la valeur deregion_nameavec le nom de la région associée à votre rôle Textract. Remplacez la valeur defile_namepar le nom que vous souhaitez donner au CSV de sortie.

    import boto3 from pprint import pprint jobId = 'job-id' region_name = 'region-name' file_name = "output-file-name.csv" textract = boto3.client('textract', region_name=region_name) # Display information about a block def DisplayBlockInfo(block): print("Block Id: " + block['Id']) print("Type: " + block['BlockType']) if 'EntityTypes' in block: print('EntityTypes: {}'.format(block['EntityTypes'])) if 'Text' in block: print("Text: " + block['Text']) if block['BlockType'] != 'PAGE': print("Confidence: " + "{:.2f}".format(block['Confidence']) + "%") def GetResults(jobId, file_name): maxResults = 1000 paginationToken = None finished = False while finished == False: response = None if paginationToken == None: response = textract.get_document_analysis(JobId=jobId, MaxResults=maxResults) else: response = textract.get_document_analysis(JobId=jobId, MaxResults=maxResults, NextToken=paginationToken) blocks = response['Blocks'] table_csv = get_table_csv_results(blocks) output_file = file_name # replace content with open(output_file, "at") as fout: fout.write(table_csv) # show the results print('Detected Document Text') print('Pages: {}'.format(response['DocumentMetadata']['Pages'])) print('OUTPUT TO CSV FILE: ', output_file) # Display block information for block in blocks: DisplayBlockInfo(block) print() print() if 'NextToken' in response: paginationToken = response['NextToken'] else: finished = True def get_rows_columns_map(table_result, blocks_map): rows = {} for relationship in table_result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: try: cell = blocks_map[child_id] if cell['BlockType'] == 'CELL': row_index = cell['RowIndex'] col_index = cell['ColumnIndex'] if row_index not in rows: # create new row rows[row_index] = {} # get the text value rows[row_index][col_index] = get_text(cell, blocks_map) except KeyError: print("Error extracting Table data - {}:".format(KeyError)) pass return rows def get_text(result, blocks_map): text = '' if 'Relationships' in result: for relationship in result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: try: word = blocks_map[child_id] if word['BlockType'] == 'WORD': text += word['Text'] + ' ' if word['BlockType'] == 'SELECTION_ELEMENT': if word['SelectionStatus'] == 'SELECTED': text += 'X ' except KeyError: print("Error extracting Table data - {}:".format(KeyError)) return text def get_table_csv_results(blocks): pprint(blocks) blocks_map = {} table_blocks = [] for block in blocks: blocks_map[block['Id']] = block if block['BlockType'] == "TABLE": table_blocks.append(block) if len(table_blocks) <= 0: return "<b> NO Table FOUND </b>" csv = '' for index, table in enumerate(table_blocks): csv += generate_table_csv(table, blocks_map, index + 1) csv += '\n\n' return csv def generate_table_csv(table_result, blocks_map, table_index): rows = get_rows_columns_map(table_result, blocks_map) table_id = 'Table_' + str(table_index) # get cells. csv = 'Table: {0}\n\n'.format(table_id) for row_index, cols in rows.items(): for col_index, text in cols.items(): csv += '{}'.format(text) + "," csv += '\n' csv += '\n\n\n' return csv response_blocks = GetResults(jobId, file_name)
  6. Exécutez le code.

    Une fois que vous avez obtenu vos résultats, veillez à supprimer les ressources SNS et SQS associées, sinon vous risquez d'accumuler des frais pour elles.