Run UNLOAD query - Amazon Timestream

Run UNLOAD query

The following code examples call an UNLOAD query. For information about UNLOAD, see Using UNLOAD to export query results to S3 from Timestream for LiveAnalytics. For examples of UNLOAD queries, see Example use case for UNLOAD from Timestream for LiveAnalytics.

Build and run an UNLOAD query

Java
// When you have a SELECT like below String QUERY_1 = "SELECT user_id, ip_address, event, session_id, measure_name, time, query, quantity, product_id, channel FROM " + DATABASE_NAME + "." + UNLOAD_TABLE_NAME + " WHERE time BETWEEN ago(2d) AND now()"; // You can construct UNLOAD query as follows UnloadQuery unloadQuery = UnloadQuery.builder() .selectQuery(QUERY_1) .bucketName("timestream-sample-<region>-<accountId>") .resultsPrefix("without_partition") .format(CSV) .compression(UnloadQuery.Compression.GZIP) .build(); QueryResult unloadResult = runQuery(unloadQuery.getUnloadQuery()); // Run UNLOAD query (Similar to how you run SELECT query) // https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.pagination private QueryResult runQuery(String queryString) { QueryResult queryResult = null; try { QueryRequest queryRequest = new QueryRequest(); queryRequest.setQueryString(queryString); queryResult = queryClient.query(queryRequest); while (true) { parseQueryResult(queryResult); if (queryResult.getNextToken() == null) { break; } queryRequest.setNextToken(queryResult.getNextToken()); queryResult = queryClient.query(queryRequest); } } catch (Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries e.printStackTrace(); } return queryResult; } // Utility that helps to construct UNLOAD query @Builder static class UnloadQuery { private String selectQuery; private String bucketName; private String resultsPrefix; private Format format; private Compression compression; private EncryptionType encryptionType; private List<String> partitionColumns; private String kmsKey; private Character csvFieldDelimiter; private Character csvEscapeCharacter; public String getUnloadQuery() { String destination = constructDestination(); String withClause = constructOptionalParameters(); return String.format("UNLOAD (%s) TO '%s' %s", selectQuery, destination, withClause); } private String constructDestination() { return "s3://" + this.bucketName + "/" + this.resultsPrefix + "/"; } private String constructOptionalParameters() { boolean isOptionalParametersPresent = Objects.nonNull(format) || Objects.nonNull(compression) || Objects.nonNull(encryptionType) || Objects.nonNull(partitionColumns) || Objects.nonNull(kmsKey) || Objects.nonNull(csvFieldDelimiter) || Objects.nonNull(csvEscapeCharacter); String withClause = ""; if (isOptionalParametersPresent) { StringJoiner optionalParameters = new StringJoiner(","); if (Objects.nonNull(format)) { optionalParameters.add("format = '" + format + "'"); } if (Objects.nonNull(compression)) { optionalParameters.add("compression = '" + compression + "'"); } if (Objects.nonNull(encryptionType)) { optionalParameters.add("encryption = '" + encryptionType + "'"); } if (Objects.nonNull(kmsKey)) { optionalParameters.add("kms_key = '" + kmsKey + "'"); } if (Objects.nonNull(csvFieldDelimiter)) { optionalParameters.add("field_delimiter = '" + csvFieldDelimiter + "'"); } if (Objects.nonNull(csvEscapeCharacter)) { optionalParameters.add("escaped_by = '" + csvEscapeCharacter + "'"); } if (Objects.nonNull(partitionColumns) && !partitionColumns.isEmpty()) { final StringJoiner partitionedByList = new StringJoiner(","); partitionColumns.forEach(column -> partitionedByList.add("'" + column + "'")); optionalParameters.add(String.format("partitioned_by = ARRAY[%s]", partitionedByList)); } withClause = String.format("WITH (%s)", optionalParameters); } return withClause; } public enum Format { CSV, PARQUET } public enum Compression { GZIP, NONE } public enum EncryptionType { SSE_S3, SSE_KMS } @Override public String toString() { return getUnloadQuery(); } }
Java v2
// When you have a SELECT like below String QUERY_1 = "SELECT user_id, ip_address, event, session_id, measure_name, time, query, quantity, product_id, channel FROM " + DATABASE_NAME + "." + UNLOAD_TABLE_NAME + " WHERE time BETWEEN ago(2d) AND now()"; //You can construct UNLOAD query as follows UnloadQuery unloadQuery = UnloadQuery.builder() .selectQuery(QUERY_1) .bucketName("timestream-sample-<region>-<accountId>") .resultsPrefix("without_partition") .format(CSV) .compression(UnloadQuery.Compression.GZIP) .build(); QueryResponse unloadResponse = runQuery(unloadQuery.getUnloadQuery()); // Run UNLOAD query (Similar to how you run SELECT query) // https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.pagination private QueryResponse runQuery(String queryString) { QueryResponse finalResponse = null; try { QueryRequest queryRequest = QueryRequest.builder().queryString(queryString).build(); final QueryIterable queryResponseIterator = timestreamQueryClient.queryPaginator(queryRequest); for(QueryResponse queryResponse : queryResponseIterator) { parseQueryResult(queryResponse); finalResponse = queryResponse; } } catch (Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries e.printStackTrace(); } return finalResponse; } // Utility that helps to construct UNLOAD query @Builder static class UnloadQuery { private String selectQuery; private String bucketName; private String resultsPrefix; private Format format; private Compression compression; private EncryptionType encryptionType; private List<String> partitionColumns; private String kmsKey; private Character csvFieldDelimiter; private Character csvEscapeCharacter; public String getUnloadQuery() { String destination = constructDestination(); String withClause = constructOptionalParameters(); return String.format("UNLOAD (%s) TO '%s' %s", selectQuery, destination, withClause); } private String constructDestination() { return "s3://" + this.bucketName + "/" + this.resultsPrefix + "/"; } private String constructOptionalParameters() { boolean isOptionalParametersPresent = Objects.nonNull(format) || Objects.nonNull(compression) || Objects.nonNull(encryptionType) || Objects.nonNull(partitionColumns) || Objects.nonNull(kmsKey) || Objects.nonNull(csvFieldDelimiter) || Objects.nonNull(csvEscapeCharacter); String withClause = ""; if (isOptionalParametersPresent) { StringJoiner optionalParameters = new StringJoiner(","); if (Objects.nonNull(format)) { optionalParameters.add("format = '" + format + "'"); } if (Objects.nonNull(compression)) { optionalParameters.add("compression = '" + compression + "'"); } if (Objects.nonNull(encryptionType)) { optionalParameters.add("encryption = '" + encryptionType + "'"); } if (Objects.nonNull(kmsKey)) { optionalParameters.add("kms_key = '" + kmsKey + "'"); } if (Objects.nonNull(csvFieldDelimiter)) { optionalParameters.add("field_delimiter = '" + csvFieldDelimiter + "'"); } if (Objects.nonNull(csvEscapeCharacter)) { optionalParameters.add("escaped_by = '" + csvEscapeCharacter + "'"); } if (Objects.nonNull(partitionColumns) && !partitionColumns.isEmpty()) { final StringJoiner partitionedByList = new StringJoiner(","); partitionColumns.forEach(column -> partitionedByList.add("'" + column + "'")); optionalParameters.add(String.format("partitioned_by = ARRAY[%s]", partitionedByList)); } withClause = String.format("WITH (%s)", optionalParameters); } return withClause; } public enum Format { CSV, PARQUET } public enum Compression { GZIP, NONE } public enum EncryptionType { SSE_S3, SSE_KMS } @Override public String toString() { return getUnloadQuery(); } }
Go
// When you have a SELECT like below var Query = "SELECT user_id, ip_address, event, session_id, measure_name, time, query, quantity, product_id, channel FROM " + *databaseName + "." + *tableName + " WHERE time BETWEEN ago(2d) AND now()" // You can construct UNLOAD query as follows var unloadQuery = UnloadQuery{ Query: "SELECT user_id, ip_address, session_id, measure_name, time, query, quantity, product_id, channel, event FROM " + *databaseName + "." + *tableName + " WHERE time BETWEEN ago(2d) AND now()", Partitioned_by: []string{}, Compression: "GZIP", Format: "CSV", S3Location: bucketName, ResultPrefix: "without_partition", } // Run UNLOAD query (Similar to how you run SELECT query) // https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.pagination queryInput := &timestreamquery.QueryInput{ QueryString: build_query(unloadQuery), } err := querySvc.QueryPages(queryInput, func(page *timestreamquery.QueryOutput, lastPage bool) bool { if (lastPage) { var response = parseQueryResult(page) var unloadFiles = getManifestAndMetadataFiles(s3Svc, response) displayColumns(unloadFiles, unloadQuery.Partitioned_by) displayResults(s3Svc, unloadFiles) } return true }) if err != nil { fmt.Println("Error:") fmt.Println(err) } // Utility that helps to construct UNLOAD query type UnloadQuery struct { Query string Partitioned_by []string Format string S3Location string ResultPrefix string Compression string } func build_query(unload_query UnloadQuery) *string { var query_results_s3_path = "'s3://" + unload_query.S3Location + "/" + unload_query.ResultPrefix + "/'" var query = "UNLOAD(" + unload_query.Query + ") TO " + query_results_s3_path + " WITH ( " if (len(unload_query.Partitioned_by) > 0) { query = query + "partitioned_by=ARRAY[" for i, column := range unload_query.Partitioned_by { if i == 0 { query = query + "'" + column + "'" } else { query = query + ",'" + column + "'" } } query = query + "]," } query = query + " format='" + unload_query.Format + "', " query = query + " compression='" + unload_query.Compression + "')" fmt.Println(query) return aws.String(query) }
Python
# When you have a SELECT like below QUERY_1 = "SELECT user_id, ip_address, event, session_id, measure_name, time, query, quantity, product_id, channel FROM " + database_name + "." + table_name + " WHERE time BETWEEN ago(2d) AND now()" # You can construct UNLOAD query as follows UNLOAD_QUERY_1 = UnloadQuery(QUERY_1, "timestream-sample-<region>-<accountId>", "without_partition", "CSV", "GZIP", "") # Run UNLOAD query (Similar to how you run SELECT query) # https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.pagination def run_query(self, query_string): try: page_iterator = self.paginator.paginate(QueryString=UNLOAD_QUERY_1) except Exception as err: print("Exception while running query:", err) # Utility that helps to construct UNLOAD query class UnloadQuery: def __init__(self, query, s3_bucket_location, results_prefix, format, compression , partition_by): self.query = query self.s3_bucket_location = s3_bucket_location self.results_prefix = results_prefix self.format = format self.compression = compression self.partition_by = partition_by def build_query(self): query_results_s3_path = "'s3://" + self.s3_bucket_location + "/" + self.results_prefix + "/'" unload_query = "UNLOAD(" unload_query = unload_query + self.query unload_query = unload_query + ") " unload_query = unload_query + " TO " + query_results_s3_path unload_query = unload_query + " WITH ( " if(len(self.partition_by) > 0) : unload_query = unload_query + " partitioned_by = ARRAY" + str(self.partition_by) + "," unload_query = unload_query + " format='" + self.format + "', " unload_query = unload_query + " compression='" + self.compression + "')" return unload_query
Node.js
// When you have a SELECT like below QUERY_1 = "SELECT user_id, ip_address, event, session_id, measure_name, time, query, quantity, product_id, channel FROM " + database_name + "." + table_name + " WHERE time BETWEEN ago(2d) AND now()" // You can construct UNLOAD query as follows UNLOAD_QUERY_1 = new UnloadQuery(QUERY_1, "timestream-sample-<region>-<accountId>", "without_partition", "CSV", "GZIP", "") // Run UNLOAD query (Similar to how you run SELECT query) // https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.pagination async runQuery(query = UNLOAD_QUERY_1, nextToken) { const params = new QueryCommand({ QueryString: query }); if (nextToken) { params.NextToken = nextToken; } await queryClient.send(params).then( (response) => { if (response.NextToken) { runQuery(queryClient, query, response.NextToken); } else { await parseAndDisplayResults(response); } }, (err) => { console.error("Error while querying:", err); }); } class UnloadQuery { constructor(query, s3_bucket_location, results_prefix, format, compression , partition_by) { this.query = query; this.s3_bucket_location = s3_bucket_location this.results_prefix = results_prefix this.format = format this.compression = compression this.partition_by = partition_by } buildQuery() { const query_results_s3_path = "'s3://" + this.s3_bucket_location + "/" + this.results_prefix + "/'" let unload_query = "UNLOAD(" unload_query = unload_query + this.query unload_query = unload_query + ") " unload_query = unload_query + " TO " + query_results_s3_path unload_query = unload_query + " WITH ( " if(this.partition_by.length > 0) { let partitionBy = "" this.partition_by.forEach((str, i) => { partitionBy = partitionBy + (i ? ",'" : "'") + str + "'" }) unload_query = unload_query + " partitioned_by = ARRAY[" + partitionBy + "]," } unload_query = unload_query + " format='" + this.format + "', " unload_query = unload_query + " compression='" + this.compression + "')" return unload_query } }

Parse UNLOAD response, and get row count, manifest link, and metadata link

Java
// Parsing UNLOAD query response is similar to how you parse SELECT query response: // https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.parsing // But unlike SELECT, UNLOAD only has 1 row * 3 columns outputed // (rows, metadataFile, manifestFile) => (BIGINT, VARCHAR, VARCHAR) public UnloadResponse parseResult(QueryResult queryResult) { Map<String, String> outputMap = new HashMap<>(); for (int i = 0; i < queryResult.getColumnInfo().size(); i++) { outputMap.put(queryResult.getColumnInfo().get(i).getName(), queryResult.getRows().get(0).getData().get(i).getScalarValue()); } return new UnloadResponse(outputMap); } @Getter class UnloadResponse { private final String metadataFile; private final String manifestFile; private final int rows; public UnloadResponse(Map<String, String> unloadResponse) { this.metadataFile = unloadResponse.get("metadataFile"); this.manifestFile = unloadResponse.get("manifestFile"); this.rows = Integer.parseInt(unloadResponse.get("rows")); } }
Java v2
// Parsing UNLOAD query response is similar to how you parse SELECT query response: // https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.parsing // But unlike SELECT, UNLOAD only has 1 row * 3 columns outputed // (rows, metadataFile, manifestFile) => (BIGINT, VARCHAR, VARCHAR) public UnloadResponse parseResult(QueryResponse queryResponse) { Map<String, String> outputMap = new HashMap<>(); for (int i = 0; i < queryResponse.columnInfo().size(); i++) { outputMap.put(queryResponse.columnInfo().get(i).name(), queryResponse.rows().get(0).data().get(i).scalarValue()); } return new UnloadResponse(outputMap); } @Getter class UnloadResponse { private final String metadataFile; private final String manifestFile; private final int rows; public UnloadResponse(Map<String, String> unloadResponse) { this.metadataFile = unloadResponse.get("metadataFile"); this.manifestFile = unloadResponse.get("manifestFile"); this.rows = Integer.parseInt(unloadResponse.get("rows")); } }
Go
// Parsing UNLOAD query response is similar to how you parse SELECT query response: // https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.parsing // But unlike SELECT, UNLOAD only has 1 row * 3 columns outputed // (rows, metadataFile, manifestFile) => (BIGINT, VARCHAR, VARCHAR) func parseQueryResult(queryOutput *timestreamquery.QueryOutput) map[string]string { var columnInfo = queryOutput.ColumnInfo; fmt.Println("ColumnInfo", columnInfo) fmt.Println("QueryId", queryOutput.QueryId) fmt.Println("QueryStatus", queryOutput.QueryStatus) return parseResponse(columnInfo, queryOutput.Rows[0]) } func parseResponse(columnInfo []*timestreamquery.ColumnInfo, row *timestreamquery.Row) map[string]string { var datum = row.Data response := make(map[string]string) for i, column := range columnInfo { response[*column.Name] = *datum[i].ScalarValue } return response }
Python
# Parsing UNLOAD query response is similar to how you parse SELECT query response: # https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.parsing # But unlike SELECT, UNLOAD only has 1 row * 3 columns outputed # (rows, metadataFile, manifestFile) => (BIGINT, VARCHAR, VARCHAR) for page in page_iterator: last_page = page response = self._parse_unload_query_result(last_page) def _parse_unload_query_result(self, query_result): column_info = query_result['ColumnInfo'] print("ColumnInfo: %s" % column_info) print("QueryId: %s" % query_result['QueryId']) print("QueryStatus:%s" % query_result['QueryStatus']) return self.parse_unload_response(column_info, query_result['Rows'][0]) def parse_unload_response(self, column_info, row): response = {} data = row['Data'] for i, column in enumerate(column_info): response[column['Name']] = data[i]['ScalarValue'] print("Rows: %s" % response['rows']) print("Metadata File location: %s" % response['metadataFile']) print("Manifest File location: %s" % response['manifestFile']) return response
Node.js
# Parsing UNLOAD query response is similar to how you parse SELECT query response: # https://docs.aws.amazon.com/timestream/latest/developerguide/code-samples.run-query.html#code-samples.run-query.parsing # But unlike SELECT, UNLOAD only has 1 row * 3 columns outputed # (rows, metadataFile, manifestFile) => (BIGINT, VARCHAR, VARCHAR) async parseAndDisplayResults(data, query) { const columnInfo = data['ColumnInfo']; console.log("ColumnInfo:", columnInfo) console.log("QueryId: %s", data['QueryId']) console.log("QueryStatus:", data['QueryStatus']) await this.parseResponse(columnInfo, data['Rows'][0], query) } async parseResponse(columnInfo, row, query) { let response = {} const data = row['Data'] columnInfo.forEach((column, i) => { response[column['Name']] = data[i]['ScalarValue'] }) console.log("Manifest file", response['manifestFile']); console.log("Metadata file", response['metadataFile']); return response }

Read and parse manifest content

Java
// Read and parse manifest content public UnloadManifest getUnloadManifest(UnloadResponse unloadResponse) throws IOException { AmazonS3URI s3URI = new AmazonS3URI(unloadResponse.getManifestFile()); S3Object s3Object = s3Client.getObject(s3URI.getBucket(), s3URI.getKey()); String manifestFileContent = new String(IOUtils.toByteArray(s3Object.getObjectContent()), StandardCharsets.UTF_8); return new Gson().fromJson(manifestFileContent, UnloadManifest.class); } class UnloadManifest { @Getter public class FileMetadata { long content_length_in_bytes; long row_count; } @Getter public class ResultFile { String url; FileMetadata file_metadata; } @Getter public class QueryMetadata { long total_content_length_in_bytes; long total_row_count; String result_format; String result_version; } @Getter public class Author { String name; String manifest_file_version; } @Getter private List<ResultFile> result_files; @Getter private QueryMetadata query_metadata; @Getter private Author author; }
Java v2
// Read and parse manifest content public UnloadManifest getUnloadManifest(UnloadResponse unloadResponse) throws URISyntaxException { // Space needs to encoded to use S3 parseUri function S3Uri s3Uri = s3Utilities.parseUri(URI.create(unloadResponse.getManifestFile().replace(" ", "%20"))); ResponseBytes<GetObjectResponse> objectBytes = s3Client.getObjectAsBytes(GetObjectRequest.builder() .bucket(s3Uri.bucket().orElseThrow(() -> new URISyntaxException(unloadResponse.getManifestFile(), "Invalid S3 URI"))) .key(s3Uri.key().orElseThrow(() -> new URISyntaxException(unloadResponse.getManifestFile(), "Invalid S3 URI"))) .build()); String manifestFileContent = new String(objectBytes.asByteArray(), StandardCharsets.UTF_8); return new Gson().fromJson(manifestFileContent, UnloadManifest.class); } class UnloadManifest { @Getter public class FileMetadata { long content_length_in_bytes; long row_count; } @Getter public class ResultFile { String url; FileMetadata file_metadata; } @Getter public class QueryMetadata { long total_content_length_in_bytes; long total_row_count; String result_format; String result_version; } @Getter public class Author { String name; String manifest_file_version; } @Getter private List<ResultFile> result_files; @Getter private QueryMetadata query_metadata; @Getter private Author author; }
Go
// Read and parse manifest content func getManifestFile(s3Svc *s3.S3, response map[string]string) Manifest { var manifestBuf = getObject(s3Svc, response["manifestFile"]) var manifest Manifest json.Unmarshal(manifestBuf.Bytes(), &manifest) return manifest } func getObject(s3Svc *s3.S3, s3Uri string) *bytes.Buffer { u,_ := url.Parse(s3Uri) getObjectInput := &s3.GetObjectInput{ Key: aws.String(u.Path), Bucket: aws.String(u.Host), } getObjectOutput, err := s3Svc.GetObject(getObjectInput) if err != nil { fmt.Println("Error: %s\n", err.Error()) } buf := new(bytes.Buffer) buf.ReadFrom(getObjectOutput.Body) return buf } // Unload's Manifest structure type Manifest struct { Author interface{} Query_metadata map[string]any Result_files []struct { File_metadata interface{} Url string } }}
Python
def __get_manifest_file(self, response): manifest = self.get_object(response['manifestFile']).read().decode('utf-8') parsed_manifest = json.loads(manifest) print("Manifest contents: \n%s" % parsed_manifest) def get_object(self, uri): try: bucket, key = uri.replace("s3://", "").split("/", 1) s3_client = boto3.client('s3', region_name=<region>) response = s3_client.get_object(Bucket=bucket, Key=key) return response['Body'] except Exception as err: print("Failed to get the object for URI:", uri) raise err
Node.js
// Read and parse manifest content async getManifestFile(response) { let manifest; await this.getS3Object(response['manifestFile']).then( (data) => { manifest = JSON.parse(data); } ); return manifest; } async getS3Object(uri) { const {bucketName, key} = this.getBucketAndKey(uri); const params = new GetObjectCommand({ Bucket: bucketName, Key: key }) const response = await this.s3Client.send(params); return await response.Body.transformToString(); } getBucketAndKey(uri) { const [bucketName] = uri.replace("s3://", "").split("/", 1); const key = uri.replace("s3://", "").split('/').slice(1).join('/'); return {bucketName, key}; }

Read and parse metadata content

Java
// Read and parse metadata content public UnloadMetadata getUnloadMetadata(UnloadResponse unloadResponse) throws IOException { AmazonS3URI s3URI = new AmazonS3URI(unloadResponse.getMetadataFile()); S3Object s3Object = s3Client.getObject(s3URI.getBucket(), s3URI.getKey()); String metadataFileContent = new String(IOUtils.toByteArray(s3Object.getObjectContent()), StandardCharsets.UTF_8); final Gson gson = new GsonBuilder() .setFieldNamingPolicy(FieldNamingPolicy.UPPER_CAMEL_CASE) .create(); return gson.fromJson(metadataFileContent, UnloadMetadata.class); } class UnloadMetadata { @JsonProperty("ColumnInfo") List<ColumnInfo> columnInfo; @JsonProperty("Author") Author author; @Data public class Author { @JsonProperty("Name") String name; @JsonProperty("MetadataFileVersion") String metadataFileVersion; } }
Java v2
// Read and parse metadata content public UnloadMetadata getUnloadMetadata(UnloadResponse unloadResponse) throws URISyntaxException { // Space needs to encoded to use S3 parseUri function S3Uri s3Uri = s3Utilities.parseUri(URI.create(unloadResponse.getMetadataFile().replace(" ", "%20"))); ResponseBytes<GetObjectResponse> objectBytes = s3Client.getObjectAsBytes(GetObjectRequest.builder() .bucket(s3Uri.bucket().orElseThrow(() -> new URISyntaxException(unloadResponse.getMetadataFile(), "Invalid S3 URI"))) .key(s3Uri.key().orElseThrow(() -> new URISyntaxException(unloadResponse.getMetadataFile(), "Invalid S3 URI"))) .build()); String metadataFileContent = new String(objectBytes.asByteArray(), StandardCharsets.UTF_8); final Gson gson = new GsonBuilder() .setFieldNamingPolicy(FieldNamingPolicy.UPPER_CAMEL_CASE) .create(); return gson.fromJson(metadataFileContent, UnloadMetadata.class); } class UnloadMetadata { @JsonProperty("ColumnInfo") List<ColumnInfo> columnInfo; @JsonProperty("Author") Author author; @Data public class Author { @JsonProperty("Name") String name; @JsonProperty("MetadataFileVersion") String metadataFileVersion; } }
Go
// Read and parse metadata content func getMetadataFile(s3Svc *s3.S3, response map[string]string) Metadata { var metadataBuf = getObject(s3Svc, response["metadataFile"]) var metadata Metadata json.Unmarshal(metadataBuf.Bytes(), &metadata) return metadata } func getObject(s3Svc *s3.S3, s3Uri string) *bytes.Buffer { u,_ := url.Parse(s3Uri) getObjectInput := &s3.GetObjectInput{ Key: aws.String(u.Path), Bucket: aws.String(u.Host), } getObjectOutput, err := s3Svc.GetObject(getObjectInput) if err != nil { fmt.Println("Error: %s\n", err.Error()) } buf := new(bytes.Buffer) buf.ReadFrom(getObjectOutput.Body) return buf } // Unload's Metadata structure type Metadata struct { Author interface{} ColumnInfo []struct { Name string Type map[string]string } }
Python
def __get_metadata_file(self, response): metadata = self.get_object(response['metadataFile']).read().decode('utf-8') parsed_metadata = json.loads(metadata) print("Metadata contents: \n%s" % parsed_metadata) def get_object(self, uri): try: bucket, key = uri.replace("s3://", "").split("/", 1) s3_client = boto3.client('s3', region_name=<region>) response = s3_client.get_object(Bucket=bucket, Key=key) return response['Body'] except Exception as err: print("Failed to get the object for URI:", uri) raise err
Node.js
// Read and parse metadata content async getMetadataFile(response) { let metadata; await this.getS3Object(response['metadataFile']).then( (data) => { metadata = JSON.parse(data); } ); return metadata; } async getS3Object(uri) { const {bucketName, key} = this.getBucketAndKey(uri); const params = new GetObjectCommand({ Bucket: bucketName, Key: key }) const response = await this.s3Client.send(params); return await response.Body.transformToString(); } getBucketAndKey(uri) { const [bucketName] = uri.replace("s3://", "").split("/", 1); const key = uri.replace("s3://", "").split('/').slice(1).join('/'); return {bucketName, key}; }