Exportación de tablas a un archivo CSV - Amazon Textract

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Exportación de tablas a un archivo CSV

En estos ejemplos de Python se muestran cómo exportar tablas de una imagen de un documento a un archivo de valores separados por comas (CSV).

El ejemplo de análisis de documentos síncrono recopila información de tabla de una llamada aAnalyzeDocument. El ejemplo de análisis de documentos asíncrono realiza una llamada aStartDocumentAnalysisy, a continuación, recupera los resultados deGetDocumentAnalysiscomoBlockobjetos.

La información de la tabla se devuelve comoBlockobjetos de una llamada aAnalyzeDocument. Para obtener más información, consulte Tablas. LaBlocklos objetos se almacenan en una estructura de mapa que se utiliza para exportar los datos de la tabla a un archivo CSV.

Synchronous

En este ejemplo, utilizará las funciones:

  • get_table_csv_results— LlamadasAnalyzeDocumenty crea un mapa de tablas que se detectan en el documento. Crea una representación CSV de todas las tablas detectadas.

  • generate_table_csv: genera el archivo CSV de una tabla individual.

  • get_rows_columns_map— Obtiene las filas y columnas del mapa.

  • get_text— Obtiene el texto de una celda.

Para exportar tablas a un archivo CSV
  1. Configure el entorno. Para obtener más información, consulte Requisitos previos.

  2. Copie el siguiente código de ejemplo en un archivo llamadotextract_python_table_parser.py que es.

    import webbrowser, os import json import boto3 import io from io import BytesIO import sys from pprint import pprint def get_rows_columns_map(table_result, blocks_map): rows = {} for relationship in table_result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: cell = blocks_map[child_id] if cell['BlockType'] == 'CELL': row_index = cell['RowIndex'] col_index = cell['ColumnIndex'] if row_index not in rows: # create new row rows[row_index] = {} # get the text value rows[row_index][col_index] = get_text(cell, blocks_map) return rows def get_text(result, blocks_map): text = '' if 'Relationships' in result: for relationship in result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: word = blocks_map[child_id] if word['BlockType'] == 'WORD': text += word['Text'] + ' ' if word['BlockType'] == 'SELECTION_ELEMENT': if word['SelectionStatus'] =='SELECTED': text += 'X ' return text def get_table_csv_results(file_name): with open(file_name, 'rb') as file: img_test = file.read() bytes_test = bytearray(img_test) print('Image loaded', file_name) # process using image bytes # get the results client = boto3.client('textract') response = client.analyze_document(Document={'Bytes': bytes_test}, FeatureTypes=['TABLES']) # Get the text blocks blocks=response['Blocks'] pprint(blocks) blocks_map = {} table_blocks = [] for block in blocks: blocks_map[block['Id']] = block if block['BlockType'] == "TABLE": table_blocks.append(block) if len(table_blocks) <= 0: return "<b> NO Table FOUND </b>" csv = '' for index, table in enumerate(table_blocks): csv += generate_table_csv(table, blocks_map, index +1) csv += '\n\n' return csv def generate_table_csv(table_result, blocks_map, table_index): rows = get_rows_columns_map(table_result, blocks_map) table_id = 'Table_' + str(table_index) # get cells. csv = 'Table: {0}\n\n'.format(table_id) for row_index, cols in rows.items(): for col_index, text in cols.items(): csv += '{}'.format(text) + "," csv += '\n' csv += '\n\n\n' return csv def main(file_name): table_csv = get_table_csv_results(file_name) output_file = 'output.csv' # replace content with open(output_file, "wt") as fout: fout.write(table_csv) # show the results print('CSV OUTPUT FILE: ', output_file) if __name__ == "__main__": file_name = sys.argv[1] main(file_name)
  3. En el símbolo del sistema, escriba el siguiente comando. Reemplazarfilecon el nombre del archivo de imagen de documento que desee analizar.

    python textract_python_table_parser.py file

Al ejecutar el ejemplo, la salida CSV se guarda en un archivo denominadooutput.csv.

Asynchronous

En este ejemplo, utilizará hacer uso de dos scripts diferentes. El primer guión inicia el proceso de análisis asincrónico de documentos conStartDocumentAnalysisy obtiene elBlockinformación devuelta porGetDocumentAnalysis. El segundo guión toma el devueltoBlockinformación de cada página, da formato a los datos como tabla y guarda las tablas en un archivo CSV.

Para exportar tablas a un archivo CSV
  1. Configure el entorno. Para obtener más información, consulte Requisitos previos.

  2. Asegúrese de haber seguido las instrucciones que se dan en verConfiguración de Amazon Textract Texact para operaciones asíncronas. El proceso documentado en esa página le permite enviar y recibir mensajes sobre el estado de finalización de los trabajos asíncronos.

  3. En el siguiente ejemplo de código, sustituya el valor deroleArncon el Arn asignado al rol que creó en el paso 2. Sustituir el valor debucketcon el nombre del bucket de S3 que contenga el documento. Sustituir el valor dedocumentcon el nombre del documento de su bucket de S3. Sustituir el valor deregion_namecon el nombre de la región de su bucket.

    Copie el siguiente código de ejemplo en un archivo llamadostart_doc_analysis_for_table_extraction.py que es..

    import boto3 import time class DocumentProcessor: jobId = '' region_name = '' roleArn = '' bucket = '' document = '' sqsQueueUrl = '' snsTopicArn = '' processType = '' def __init__(self, role, bucket, document, region): self.roleArn = role self.bucket = bucket self.document = document self.region_name = region self.textract = boto3.client('textract', region_name=self.region_name) self.sqs = boto3.client('sqs') self.sns = boto3.client('sns') def ProcessDocument(self): jobFound = False response = self.textract.start_document_analysis(DocumentLocation={'S3Object': {'Bucket': self.bucket, 'Name': self.document}}, FeatureTypes=["TABLES", "FORMS"], NotificationChannel={'RoleArn': self.roleArn, 'SNSTopicArn': self.snsTopicArn}) print('Processing type: Analysis') print('Start Job Id: ' + response['JobId']) print('Done!') def CreateTopicandQueue(self): millis = str(int(round(time.time() * 1000))) # Create SNS topic snsTopicName = "AmazonTextractTopic" + millis topicResponse = self.sns.create_topic(Name=snsTopicName) self.snsTopicArn = topicResponse['TopicArn'] # create SQS queue sqsQueueName = "AmazonTextractQueue" + millis self.sqs.create_queue(QueueName=sqsQueueName) self.sqsQueueUrl = self.sqs.get_queue_url(QueueName=sqsQueueName)['QueueUrl'] attribs = self.sqs.get_queue_attributes(QueueUrl=self.sqsQueueUrl, AttributeNames=['QueueArn'])['Attributes'] sqsQueueArn = attribs['QueueArn'] # Subscribe SQS queue to SNS topic self.sns.subscribe(TopicArn=self.snsTopicArn, Protocol='sqs', Endpoint=sqsQueueArn) # Authorize SNS to write SQS queue policy = """{{ "Version":"2012-10-17", "Statement":[ {{ "Sid":"MyPolicy", "Effect":"Allow", "Principal" : {{"AWS" : "*"}}, "Action":"SQS:SendMessage", "Resource": "{}", "Condition":{{ "ArnEquals":{{ "aws:SourceArn": "{}" }} }} }} ] }}""".format(sqsQueueArn, self.snsTopicArn) response = self.sqs.set_queue_attributes( QueueUrl=self.sqsQueueUrl, Attributes={ 'Policy': policy }) def main(): roleArn = 'role-arn' bucket = 'bucket-name' document = 'document-name' region_name = 'region-name' analyzer = DocumentProcessor(roleArn, bucket, document, region_name) analyzer.CreateTopicandQueue() analyzer.ProcessDocument() if __name__ == "__main__": main()
  4. Ejecute el código. El código imprimirá un JobId. Copia este JobId hacia abajo.

  5. Espere a que el trabajo finalice el procesamiento y, una vez finalizado, copie el siguiente código en un archivo denominadoget_doc_analysis_for_table_extraction.py que es. Sustituir el valor dejobIdcon el ID de Job que copiaste anteriormente. Sustituir el valor deregion_namecon el nombre de la región asociada a su rol Textract. Sustituir el valor defile_namecon el nombre que desea asignar al archivo CSV de salida.

    import boto3 from pprint import pprint jobId = 'job-id' region_name = 'region-name' file_name = "output-file-name.csv" textract = boto3.client('textract', region_name=region_name) # Display information about a block def DisplayBlockInfo(block): print("Block Id: " + block['Id']) print("Type: " + block['BlockType']) if 'EntityTypes' in block: print('EntityTypes: {}'.format(block['EntityTypes'])) if 'Text' in block: print("Text: " + block['Text']) if block['BlockType'] != 'PAGE': print("Confidence: " + "{:.2f}".format(block['Confidence']) + "%") def GetResults(jobId, file_name): maxResults = 1000 paginationToken = None finished = False while finished == False: response = None if paginationToken == None: response = textract.get_document_analysis(JobId=jobId, MaxResults=maxResults) else: response = textract.get_document_analysis(JobId=jobId, MaxResults=maxResults, NextToken=paginationToken) blocks = response['Blocks'] table_csv = get_table_csv_results(blocks) output_file = file_name # replace content with open(output_file, "at") as fout: fout.write(table_csv) # show the results print('Detected Document Text') print('Pages: {}'.format(response['DocumentMetadata']['Pages'])) print('OUTPUT TO CSV FILE: ', output_file) # Display block information for block in blocks: DisplayBlockInfo(block) print() print() if 'NextToken' in response: paginationToken = response['NextToken'] else: finished = True def get_rows_columns_map(table_result, blocks_map): rows = {} for relationship in table_result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: try: cell = blocks_map[child_id] if cell['BlockType'] == 'CELL': row_index = cell['RowIndex'] col_index = cell['ColumnIndex'] if row_index not in rows: # create new row rows[row_index] = {} # get the text value rows[row_index][col_index] = get_text(cell, blocks_map) except KeyError: print("Error extracting Table data - {}:".format(KeyError)) pass return rows def get_text(result, blocks_map): text = '' if 'Relationships' in result: for relationship in result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: try: word = blocks_map[child_id] if word['BlockType'] == 'WORD': text += word['Text'] + ' ' if word['BlockType'] == 'SELECTION_ELEMENT': if word['SelectionStatus'] == 'SELECTED': text += 'X ' except KeyError: print("Error extracting Table data - {}:".format(KeyError)) return text def get_table_csv_results(blocks): pprint(blocks) blocks_map = {} table_blocks = [] for block in blocks: blocks_map[block['Id']] = block if block['BlockType'] == "TABLE": table_blocks.append(block) if len(table_blocks) <= 0: return "<b> NO Table FOUND </b>" csv = '' for index, table in enumerate(table_blocks): csv += generate_table_csv(table, blocks_map, index + 1) csv += '\n\n' return csv def generate_table_csv(table_result, blocks_map, table_index): rows = get_rows_columns_map(table_result, blocks_map) table_id = 'Table_' + str(table_index) # get cells. csv = 'Table: {0}\n\n'.format(table_id) for row_index, cols in rows.items(): for col_index, text in cols.items(): csv += '{}'.format(text) + "," csv += '\n' csv += '\n\n\n' return csv response_blocks = GetResults(jobId, file_name)
  6. Ejecute el código.

    Una vez que haya obtenido los resultados, asegúrese de eliminar los recursos SNS y SQS asociados o, de lo contrario, puede acumular cargos por ellos.