Exportieren von Tabellen in eine CSV-Datei - Amazon Textract

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Exportieren von Tabellen in eine CSV-Datei

Diese Python-Beispiele zeigen, wie Tabellen aus einem Bild eines Dokuments in eine Komma-getrennte Datei (Comma-getrennte Datei, CSV-Datei) exportiert werden.

Das Beispiel für die synchrone Dokumentenanalyse sammelt Tabelleninformationen aus einem Aufruf anAnalyzeDocumentaus. Das Beispiel für die asynchrone Dokumentenanalyse ruftStartDocumentAnalysisund ruft dann die Ergebnisse abGetDocumentAnalysisalsBlockObjekte.

Tabelleninformationen werden zurückgegeben alsBlockObjekte von einem Anruf anAnalyzeDocumentaus. Weitere Informationen finden Sie unter Tabellen . DieBlock-Objekte werden in einer Kartenstruktur gespeichert, die zum Exportieren der Tabellendaten in eine CSV-Datei verwendet wird.

Synchronous

In diesem Beispiel verwenden Sie die Funktionen:

  • get_table_csv_results— RuftAnalyzeDocumentund erstellt eine Map von Tabellen, die im Dokument erkannt werden. Erstellt eine CSV-Darstellung aller erkannten Tabellen.

  • generate_table_csv— Erzeugt die CSV-Datei für eine einzelne Tabelle.

  • get_rows_columns_map— Ruft die Zeilen und Spalten aus der Map ab.

  • get_text— Ruft den Text aus einer Zelle ab.

So exportieren Sie Tabellen in eine CSV-Datei
  1. Konfigurieren Sie Ihre Umgebung. Weitere Informationen finden Sie unter Voraussetzungen .

  2. Speichern Sie den folgenden Beispiel-Code in einer Datei mit dem Namentextract_python_table_parser.pyaus.

    import webbrowser, os import json import boto3 import io from io import BytesIO import sys from pprint import pprint def get_rows_columns_map(table_result, blocks_map): rows = {} for relationship in table_result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: cell = blocks_map[child_id] if cell['BlockType'] == 'CELL': row_index = cell['RowIndex'] col_index = cell['ColumnIndex'] if row_index not in rows: # create new row rows[row_index] = {} # get the text value rows[row_index][col_index] = get_text(cell, blocks_map) return rows def get_text(result, blocks_map): text = '' if 'Relationships' in result: for relationship in result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: word = blocks_map[child_id] if word['BlockType'] == 'WORD': text += word['Text'] + ' ' if word['BlockType'] == 'SELECTION_ELEMENT': if word['SelectionStatus'] =='SELECTED': text += 'X ' return text def get_table_csv_results(file_name): with open(file_name, 'rb') as file: img_test = file.read() bytes_test = bytearray(img_test) print('Image loaded', file_name) # process using image bytes # get the results client = boto3.client('textract') response = client.analyze_document(Document={'Bytes': bytes_test}, FeatureTypes=['TABLES']) # Get the text blocks blocks=response['Blocks'] pprint(blocks) blocks_map = {} table_blocks = [] for block in blocks: blocks_map[block['Id']] = block if block['BlockType'] == "TABLE": table_blocks.append(block) if len(table_blocks) <= 0: return "<b> NO Table FOUND </b>" csv = '' for index, table in enumerate(table_blocks): csv += generate_table_csv(table, blocks_map, index +1) csv += '\n\n' return csv def generate_table_csv(table_result, blocks_map, table_index): rows = get_rows_columns_map(table_result, blocks_map) table_id = 'Table_' + str(table_index) # get cells. csv = 'Table: {0}\n\n'.format(table_id) for row_index, cols in rows.items(): for col_index, text in cols.items(): csv += '{}'.format(text) + "," csv += '\n' csv += '\n\n\n' return csv def main(file_name): table_csv = get_table_csv_results(file_name) output_file = 'output.csv' # replace content with open(output_file, "wt") as fout: fout.write(table_csv) # show the results print('CSV OUTPUT FILE: ', output_file) if __name__ == "__main__": file_name = sys.argv[1] main(file_name)
  3. Geben Sie in der Eingabeaufforderung den folgenden Befehl ein. Ersetzenfilemit dem Namen der Dokumentbilddatei, die Sie analysieren möchten.

    python textract_python_table_parser.py file

Wenn Sie das Beispiel ausführen, wird die CSV-Ausgabe in einer Datei mit dem Namen gespeichertoutput.csvaus.

Asynchronous

In diesem Beispiel verwenden Sie zwei verschiedene Skripte. Das erste Skript startet die asynchrone Analyse von Dokumenten mitStartDocumentAnalysisund bekommt dasBlockvon zurückgegebene InformationenGetDocumentAnalysisaus. Das zweite Skript nimmt das zurückgegebeneBlockInformationen für jede Seite, formatiert die Daten als Tabelle und speichert die Tabellen in einer CSV-Datei.

So exportieren Sie Tabellen in eine CSV-Datei
  1. Konfigurieren Sie Ihre Umgebung. Weitere Informationen finden Sie unter Voraussetzungen .

  2. Stellen Sie sicher, dass Sie die Anweisungen unter sehen befolgt habenKonfigurieren von Amazon Textract für asynchrone Vorgängeaus. Der auf dieser Seite dokumentierte Prozess ermöglicht es Ihnen, Nachrichten über den Abschlussstatus asynchroner Jobs zu senden und zu empfangen.

  3. Ersetzen Sie im folgenden Codebeispiel den Wert vonroleArnwobei der Arn der Rolle zugewiesen wurde, die Sie in Schritt 2 erstellt haben. Ersetzen Sie den Wert vonbucketmit dem Namen des S3-Buckets, der Ihr Dokument enthält. Ersetzen Sie den Wert vondocumentdurch den Namen des Dokuments in Ihrem S3-Bucket. Ersetzen Sie den Wert vonregion_namedurch den Namen der -Region Ihres Buckets.

    Speichern Sie den folgenden Beispiel-Code in einer Datei mit dem Namenstart_doc_analysis_for_table_extraction.py.aus.

    import boto3 import time class DocumentProcessor: jobId = '' region_name = '' roleArn = '' bucket = '' document = '' sqsQueueUrl = '' snsTopicArn = '' processType = '' def __init__(self, role, bucket, document, region): self.roleArn = role self.bucket = bucket self.document = document self.region_name = region self.textract = boto3.client('textract', region_name=self.region_name) self.sqs = boto3.client('sqs') self.sns = boto3.client('sns') def ProcessDocument(self): jobFound = False response = self.textract.start_document_analysis(DocumentLocation={'S3Object': {'Bucket': self.bucket, 'Name': self.document}}, FeatureTypes=["TABLES", "FORMS"], NotificationChannel={'RoleArn': self.roleArn, 'SNSTopicArn': self.snsTopicArn}) print('Processing type: Analysis') print('Start Job Id: ' + response['JobId']) print('Done!') def CreateTopicandQueue(self): millis = str(int(round(time.time() * 1000))) # Create SNS topic snsTopicName = "AmazonTextractTopic" + millis topicResponse = self.sns.create_topic(Name=snsTopicName) self.snsTopicArn = topicResponse['TopicArn'] # create SQS queue sqsQueueName = "AmazonTextractQueue" + millis self.sqs.create_queue(QueueName=sqsQueueName) self.sqsQueueUrl = self.sqs.get_queue_url(QueueName=sqsQueueName)['QueueUrl'] attribs = self.sqs.get_queue_attributes(QueueUrl=self.sqsQueueUrl, AttributeNames=['QueueArn'])['Attributes'] sqsQueueArn = attribs['QueueArn'] # Subscribe SQS queue to SNS topic self.sns.subscribe(TopicArn=self.snsTopicArn, Protocol='sqs', Endpoint=sqsQueueArn) # Authorize SNS to write SQS queue policy = """{{ "Version":"2012-10-17", "Statement":[ {{ "Sid":"MyPolicy", "Effect":"Allow", "Principal" : {{"AWS" : "*"}}, "Action":"SQS:SendMessage", "Resource": "{}", "Condition":{{ "ArnEquals":{{ "aws:SourceArn": "{}" }} }} }} ] }}""".format(sqsQueueArn, self.snsTopicArn) response = self.sqs.set_queue_attributes( QueueUrl=self.sqsQueueUrl, Attributes={ 'Policy': policy }) def main(): roleArn = 'role-arn' bucket = 'bucket-name' document = 'document-name' region_name = 'region-name' analyzer = DocumentProcessor(roleArn, bucket, document, region_name) analyzer.CreateTopicandQueue() analyzer.ProcessDocument() if __name__ == "__main__": main()
  4. Führen Sie den Code aus. Der Code druckt ein JobId. Kopiere dieses JobId nach unten.

  5. Warten Sie, bis Ihr Job die Verarbeitung abgeschlossen hat, und kopieren Sie nach Abschluss des Auftrags den folgenden Code in eine Datei namensget_doc_analysis_for_table_extraction.pyaus. Ersetzen Sie den Wert vonjobIdmit der Job-ID, die Sie zuvor kopiert haben. Ersetzen Sie den Wert vonregion_namemit dem Namen der Region, die mit Ihrer Textrakt-Rolle verknüpft ist. Ersetzen Sie den Wert vonfile_namedurch den Namen, den Sie dem Ausgang zuweisen möchten.

    import boto3 from pprint import pprint jobId = 'job-id' region_name = 'region-name' file_name = "output-file-name.csv" textract = boto3.client('textract', region_name=region_name) # Display information about a block def DisplayBlockInfo(block): print("Block Id: " + block['Id']) print("Type: " + block['BlockType']) if 'EntityTypes' in block: print('EntityTypes: {}'.format(block['EntityTypes'])) if 'Text' in block: print("Text: " + block['Text']) if block['BlockType'] != 'PAGE': print("Confidence: " + "{:.2f}".format(block['Confidence']) + "%") def GetResults(jobId, file_name): maxResults = 1000 paginationToken = None finished = False while finished == False: response = None if paginationToken == None: response = textract.get_document_analysis(JobId=jobId, MaxResults=maxResults) else: response = textract.get_document_analysis(JobId=jobId, MaxResults=maxResults, NextToken=paginationToken) blocks = response['Blocks'] table_csv = get_table_csv_results(blocks) output_file = file_name # replace content with open(output_file, "at") as fout: fout.write(table_csv) # show the results print('Detected Document Text') print('Pages: {}'.format(response['DocumentMetadata']['Pages'])) print('OUTPUT TO CSV FILE: ', output_file) # Display block information for block in blocks: DisplayBlockInfo(block) print() print() if 'NextToken' in response: paginationToken = response['NextToken'] else: finished = True def get_rows_columns_map(table_result, blocks_map): rows = {} for relationship in table_result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: try: cell = blocks_map[child_id] if cell['BlockType'] == 'CELL': row_index = cell['RowIndex'] col_index = cell['ColumnIndex'] if row_index not in rows: # create new row rows[row_index] = {} # get the text value rows[row_index][col_index] = get_text(cell, blocks_map) except KeyError: print("Error extracting Table data - {}:".format(KeyError)) pass return rows def get_text(result, blocks_map): text = '' if 'Relationships' in result: for relationship in result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: try: word = blocks_map[child_id] if word['BlockType'] == 'WORD': text += word['Text'] + ' ' if word['BlockType'] == 'SELECTION_ELEMENT': if word['SelectionStatus'] == 'SELECTED': text += 'X ' except KeyError: print("Error extracting Table data - {}:".format(KeyError)) return text def get_table_csv_results(blocks): pprint(blocks) blocks_map = {} table_blocks = [] for block in blocks: blocks_map[block['Id']] = block if block['BlockType'] == "TABLE": table_blocks.append(block) if len(table_blocks) <= 0: return "<b> NO Table FOUND </b>" csv = '' for index, table in enumerate(table_blocks): csv += generate_table_csv(table, blocks_map, index + 1) csv += '\n\n' return csv def generate_table_csv(table_result, blocks_map, table_index): rows = get_rows_columns_map(table_result, blocks_map) table_id = 'Table_' + str(table_index) # get cells. csv = 'Table: {0}\n\n'.format(table_id) for row_index, cols in rows.items(): for col_index, text in cols.items(): csv += '{}'.format(text) + "," csv += '\n' csv += '\n\n\n' return csv response_blocks = GetResults(jobId, file_name)
  6. Führen Sie den Code aus.

    Nachdem Sie Ihre Ergebnisse erhalten haben, löschen Sie unbedingt die zugehörigen SNS- und SQS-Ressourcen, sonst können Sie Gebühren für diese anfallen.