Mengekspor Tabel ke File CSV - Amazon Textract

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Mengekspor Tabel ke File CSV

Contoh Python ini menunjukkan cara untuk mengekspor tabel dari gambar dokumen ke file nilai yang dipisahkan koma (CSV).

Contoh untuk analisis dokumen sinkron mengumpulkan informasi tabel dari panggilan keAnalyzeDocument. Contoh untuk analisis dokumen asinkron membuat panggilan keStartDocumentAnalysisdan kemudian mengambil hasil dariGetDocumentAnalysissebagaiBlockbenda.

Informasi tabel dikembalikan sebagaiBlockobjek dari panggilan keAnalyzeDocument. Untuk informasi selengkapnya, lihat Tabel. ParameterBlockobjek disimpan dalam struktur peta yang digunakan untuk mengekspor data tabel ke dalam file CSV.

Synchronous

Dalam contoh ini, Anda akan menggunakan fungsi:

  • get_table_csv_results— PanggilanAnalyzeDocument, dan membangun peta tabel yang terdeteksi dalam dokumen. Menciptakan representasi CSV dari semua tabel terdeteksi.

  • generate_table_csv— Menghasilkan file CSV untuk tabel individu.

  • get_rows_columns_map— Mendapat baris dan kolom dari peta.

  • get_textMendapat teks dari sel.

Mengekspor tabel ke dalam file CSV
  1. Konfigurasikan lingkungan Anda. Untuk informasi selengkapnya, lihat Prasyarat.

  2. Simpan kode contoh berikut ini ke sebuah file dengan namatextract_python_table_parser.py.

    import webbrowser, os import json import boto3 import io from io import BytesIO import sys from pprint import pprint def get_rows_columns_map(table_result, blocks_map): rows = {} for relationship in table_result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: cell = blocks_map[child_id] if cell['BlockType'] == 'CELL': row_index = cell['RowIndex'] col_index = cell['ColumnIndex'] if row_index not in rows: # create new row rows[row_index] = {} # get the text value rows[row_index][col_index] = get_text(cell, blocks_map) return rows def get_text(result, blocks_map): text = '' if 'Relationships' in result: for relationship in result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: word = blocks_map[child_id] if word['BlockType'] == 'WORD': text += word['Text'] + ' ' if word['BlockType'] == 'SELECTION_ELEMENT': if word['SelectionStatus'] =='SELECTED': text += 'X ' return text def get_table_csv_results(file_name): with open(file_name, 'rb') as file: img_test = file.read() bytes_test = bytearray(img_test) print('Image loaded', file_name) # process using image bytes # get the results client = boto3.client('textract') response = client.analyze_document(Document={'Bytes': bytes_test}, FeatureTypes=['TABLES']) # Get the text blocks blocks=response['Blocks'] pprint(blocks) blocks_map = {} table_blocks = [] for block in blocks: blocks_map[block['Id']] = block if block['BlockType'] == "TABLE": table_blocks.append(block) if len(table_blocks) <= 0: return "<b> NO Table FOUND </b>" csv = '' for index, table in enumerate(table_blocks): csv += generate_table_csv(table, blocks_map, index +1) csv += '\n\n' return csv def generate_table_csv(table_result, blocks_map, table_index): rows = get_rows_columns_map(table_result, blocks_map) table_id = 'Table_' + str(table_index) # get cells. csv = 'Table: {0}\n\n'.format(table_id) for row_index, cols in rows.items(): for col_index, text in cols.items(): csv += '{}'.format(text) + "," csv += '\n' csv += '\n\n\n' return csv def main(file_name): table_csv = get_table_csv_results(file_name) output_file = 'output.csv' # replace content with open(output_file, "wt") as fout: fout.write(table_csv) # show the results print('CSV OUTPUT FILE: ', output_file) if __name__ == "__main__": file_name = sys.argv[1] main(file_name)
  3. Di prompt perintah, masukkan perintah berikut. Gantifiledengan nama file gambar dokumen yang ingin Anda analisis.

    python textract_python_table_parser.py file

Ketika Anda menjalankan contoh, output CSV disimpan dalam file bernamaoutput.csv.

Asynchronous

Dalam contoh ini, Anda akan menggunakan make menggunakan dua script yang berbeda. Script pertama memulai proses asynchronoulsy menganalisis dokumen denganStartDocumentAnalysisdan mendapatBlockinformasi yang dikembalikan olehGetDocumentAnalysis. Skrip kedua mengambil kembaliBlockinformasi untuk setiap halaman, memformat data sebagai tabel, dan menyimpan tabel ke file CSV.

Mengekspor tabel ke dalam file CSV
  1. Konfigurasikan lingkungan Anda. Untuk informasi selengkapnya, lihat Prasyarat.

  2. Pastikan bahwa Anda telah mengikuti instruksi yang diberikan di lihatMengkonfigurasi Amazon Textract untuk Operasi Asynchronous. Proses yang didokumentasikan pada halaman tersebut memungkinkan Anda mengirim dan menerima pesan tentang status penyelesaian pekerjaan asinkron.

  3. Pada contoh kode berikut, ganti nilairoleArndengan Arn yang ditetapkan untuk peran yang Anda buat di Langkah 2. Ganti nilaibucketdengan nama bucket S3 yang mengandung dokumen Anda. Ganti nilaidocumentdengan nama dokumen di bucket S3. Ganti nilairegion_nameDengan nama wilayah bucket Anda.

    Simpan kode contoh berikut ini ke sebuah file dengan namastart_doc_analysis_for_table_extraction.py..

    import boto3 import time class DocumentProcessor: jobId = '' region_name = '' roleArn = '' bucket = '' document = '' sqsQueueUrl = '' snsTopicArn = '' processType = '' def __init__(self, role, bucket, document, region): self.roleArn = role self.bucket = bucket self.document = document self.region_name = region self.textract = boto3.client('textract', region_name=self.region_name) self.sqs = boto3.client('sqs') self.sns = boto3.client('sns') def ProcessDocument(self): jobFound = False response = self.textract.start_document_analysis(DocumentLocation={'S3Object': {'Bucket': self.bucket, 'Name': self.document}}, FeatureTypes=["TABLES", "FORMS"], NotificationChannel={'RoleArn': self.roleArn, 'SNSTopicArn': self.snsTopicArn}) print('Processing type: Analysis') print('Start Job Id: ' + response['JobId']) print('Done!') def CreateTopicandQueue(self): millis = str(int(round(time.time() * 1000))) # Create SNS topic snsTopicName = "AmazonTextractTopic" + millis topicResponse = self.sns.create_topic(Name=snsTopicName) self.snsTopicArn = topicResponse['TopicArn'] # create SQS queue sqsQueueName = "AmazonTextractQueue" + millis self.sqs.create_queue(QueueName=sqsQueueName) self.sqsQueueUrl = self.sqs.get_queue_url(QueueName=sqsQueueName)['QueueUrl'] attribs = self.sqs.get_queue_attributes(QueueUrl=self.sqsQueueUrl, AttributeNames=['QueueArn'])['Attributes'] sqsQueueArn = attribs['QueueArn'] # Subscribe SQS queue to SNS topic self.sns.subscribe(TopicArn=self.snsTopicArn, Protocol='sqs', Endpoint=sqsQueueArn) # Authorize SNS to write SQS queue policy = """{{ "Version":"2012-10-17", "Statement":[ {{ "Sid":"MyPolicy", "Effect":"Allow", "Principal" : {{"AWS" : "*"}}, "Action":"SQS:SendMessage", "Resource": "{}", "Condition":{{ "ArnEquals":{{ "aws:SourceArn": "{}" }} }} }} ] }}""".format(sqsQueueArn, self.snsTopicArn) response = self.sqs.set_queue_attributes( QueueUrl=self.sqsQueueUrl, Attributes={ 'Policy': policy }) def main(): roleArn = 'role-arn' bucket = 'bucket-name' document = 'document-name' region_name = 'region-name' analyzer = DocumentProcessor(roleArn, bucket, document, region_name) analyzer.CreateTopicandQueue() analyzer.ProcessDocument() if __name__ == "__main__": main()
  4. Jalankan kode tersebut. Kode akan mencetak JobId. Salin JobId ini ke bawah.

  5. Tunggu sampai pekerjaan Anda selesai diproses, dan setelah selesai, salin kode berikut ke file bernamaget_doc_analysis_for_table_extraction.py. Ganti nilaijobIddengan ID Job yang Anda salin sebelumnya. Ganti nilairegion_namedengan nama wilayah yang terkait dengan peran Textract Anda. Ganti nilaifile_namedengan nama yang ingin Anda berikan output CSV.

    import boto3 from pprint import pprint jobId = 'job-id' region_name = 'region-name' file_name = "output-file-name.csv" textract = boto3.client('textract', region_name=region_name) # Display information about a block def DisplayBlockInfo(block): print("Block Id: " + block['Id']) print("Type: " + block['BlockType']) if 'EntityTypes' in block: print('EntityTypes: {}'.format(block['EntityTypes'])) if 'Text' in block: print("Text: " + block['Text']) if block['BlockType'] != 'PAGE': print("Confidence: " + "{:.2f}".format(block['Confidence']) + "%") def GetResults(jobId, file_name): maxResults = 1000 paginationToken = None finished = False while finished == False: response = None if paginationToken == None: response = textract.get_document_analysis(JobId=jobId, MaxResults=maxResults) else: response = textract.get_document_analysis(JobId=jobId, MaxResults=maxResults, NextToken=paginationToken) blocks = response['Blocks'] table_csv = get_table_csv_results(blocks) output_file = file_name # replace content with open(output_file, "at") as fout: fout.write(table_csv) # show the results print('Detected Document Text') print('Pages: {}'.format(response['DocumentMetadata']['Pages'])) print('OUTPUT TO CSV FILE: ', output_file) # Display block information for block in blocks: DisplayBlockInfo(block) print() print() if 'NextToken' in response: paginationToken = response['NextToken'] else: finished = True def get_rows_columns_map(table_result, blocks_map): rows = {} for relationship in table_result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: try: cell = blocks_map[child_id] if cell['BlockType'] == 'CELL': row_index = cell['RowIndex'] col_index = cell['ColumnIndex'] if row_index not in rows: # create new row rows[row_index] = {} # get the text value rows[row_index][col_index] = get_text(cell, blocks_map) except KeyError: print("Error extracting Table data - {}:".format(KeyError)) pass return rows def get_text(result, blocks_map): text = '' if 'Relationships' in result: for relationship in result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: try: word = blocks_map[child_id] if word['BlockType'] == 'WORD': text += word['Text'] + ' ' if word['BlockType'] == 'SELECTION_ELEMENT': if word['SelectionStatus'] == 'SELECTED': text += 'X ' except KeyError: print("Error extracting Table data - {}:".format(KeyError)) return text def get_table_csv_results(blocks): pprint(blocks) blocks_map = {} table_blocks = [] for block in blocks: blocks_map[block['Id']] = block if block['BlockType'] == "TABLE": table_blocks.append(block) if len(table_blocks) <= 0: return "<b> NO Table FOUND </b>" csv = '' for index, table in enumerate(table_blocks): csv += generate_table_csv(table, blocks_map, index + 1) csv += '\n\n' return csv def generate_table_csv(table_result, blocks_map, table_index): rows = get_rows_columns_map(table_result, blocks_map) table_id = 'Table_' + str(table_index) # get cells. csv = 'Table: {0}\n\n'.format(table_id) for row_index, cols in rows.items(): for col_index, text in cols.items(): csv += '{}'.format(text) + "," csv += '\n' csv += '\n\n\n' return csv response_blocks = GetResults(jobId, file_name)
  6. Jalankan kode tersebut.

    Setelah Anda memperoleh hasil, pastikan untuk menghapus sumber daya SNS dan SQS terkait, atau Anda dapat memperoleh biaya untuk mereka.