运行查询 - Amazon Timestream

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

运行查询

对结果进行分页

运行查询时,Timestream 会以分页方式返回结果集,以优化应用程序的响应能力。下面的代码片段显示了如何对结果集进行分页。必须循环浏览所有结果集页面,直到遇到空值。分页令牌在 Timestream 发放 3 小时后过期。 LiveAnalytics

注意

这些代码片段基于上的完整示例应用程序。GitHub有关如何开始使用示例应用程序的更多信息,请参阅示例应用程序

Java
private void runQuery(String queryString) { try { QueryRequest queryRequest = new QueryRequest(); queryRequest.setQueryString(queryString); QueryResult queryResult = queryClient.query(queryRequest); while (true) { parseQueryResult(queryResult); if (queryResult.getNextToken() == null) { break; } queryRequest.setNextToken(queryResult.getNextToken()); queryResult = queryClient.query(queryRequest); } } catch (Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries e.printStackTrace(); } }
Java v2
private void runQuery(String queryString) { try { QueryRequest queryRequest = QueryRequest.builder().queryString(queryString).build(); final QueryIterable queryResponseIterator = timestreamQueryClient.queryPaginator(queryRequest); for(QueryResponse queryResponse : queryResponseIterator) { parseQueryResult(queryResponse); } } catch (Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries e.printStackTrace(); } }
Go
func runQuery(queryPtr *string, querySvc *timestreamquery.TimestreamQuery, f *os.File) { queryInput := &timestreamquery.QueryInput{ QueryString: aws.String(*queryPtr), } fmt.Println("QueryInput:") fmt.Println(queryInput) // execute the query err := querySvc.QueryPages(queryInput, func(page *timestreamquery.QueryOutput, lastPage bool) bool { // process query response queryStatus := page.QueryStatus fmt.Println("Current query status:", queryStatus) // query response metadata // includes column names and types metadata := page.ColumnInfo // fmt.Println("Metadata:") fmt.Println(metadata) header := "" for i := 0; i < len(metadata); i++ { header += *metadata[i].Name if i != len(metadata)-1 { header += ", " } } write(f, header) // query response data fmt.Println("Data:") // process rows rows := page.Rows for i := 0; i < len(rows); i++ { data := rows[i].Data value := processRowType(data, metadata) fmt.Println(value) write(f, value) } fmt.Println("Number of rows:", len(page.Rows)) return true }) if err != nil { fmt.Println("Error:") fmt.Println(err) } }
Python
def run_query(self, query_string): try: page_iterator = self.paginator.paginate(QueryString=query_string) for page in page_iterator: self._parse_query_result(page) except Exception as err: print("Exception while running query:", err)
Node.js

以下代码段使用 for JavaScript V2 样式。 AWS SDK它基于 Node.js 示例 Amazon Timestream 中的示例 LiveAnalytics 应用程序,供其上使用。 GitHub

async function getAllRows(query, nextToken) { const params = { QueryString: query }; if (nextToken) { params.NextToken = nextToken; } await queryClient.query(params).promise() .then( (response) => { parseQueryResult(response); if (response.NextToken) { getAllRows(query, response.NextToken); } }, (err) => { console.error("Error while querying:", err); }); }
.NET
private async Task RunQueryAsync(string queryString) { try { QueryRequest queryRequest = new QueryRequest(); queryRequest.QueryString = queryString; QueryResponse queryResponse = await queryClient.QueryAsync(queryRequest); while (true) { ParseQueryResult(queryResponse); if (queryResponse.NextToken == null) { break; } queryRequest.NextToken = queryResponse.NextToken; queryResponse = await queryClient.QueryAsync(queryRequest); } } catch(Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries Console.WriteLine(e.ToString()); } }

解析结果集

您可以使用以下代码片段从结果集中提取数据。查询完成后,查询结果最长可在 24 小时内访问。

注意

这些代码片段基于上的完整示例应用程序。GitHub有关如何开始使用示例应用程序的更多信息,请参阅示例应用程序

Java
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS"); private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSSSSS"); private static final long ONE_GB_IN_BYTES = 1073741824L; private void parseQueryResult(QueryResult response) { final QueryStatus currentStatusOfQuery = queryResult.getQueryStatus(); System.out.println("Query progress so far: " + currentStatusOfQuery.getProgressPercentage() + "%"); double bytesScannedSoFar = ((double) currentStatusOfQuery.getCumulativeBytesScanned() / ONE_GB_IN_BYTES); System.out.println("Bytes scanned so far: " + bytesScannedSoFar + " GB"); double bytesMeteredSoFar = ((double) currentStatusOfQuery.getCumulativeBytesMetered() / ONE_GB_IN_BYTES); System.out.println("Bytes metered so far: " + bytesMeteredSoFar + " GB"); List<ColumnInfo> columnInfo = response.getColumnInfo(); List<Row> rows = response.getRows(); System.out.println("Metadata: " + columnInfo); System.out.println("Data: "); // iterate every row for (Row row : rows) { System.out.println(parseRow(columnInfo, row)); } } private String parseRow(List<ColumnInfo> columnInfo, Row row) { List<Datum> data = row.getData(); List<String> rowOutput = new ArrayList<>(); // iterate every column per row for (int j = 0; j < data.size(); j++) { ColumnInfo info = columnInfo.get(j); Datum datum = data.get(j); rowOutput.add(parseDatum(info, datum)); } return String.format("{%s}", rowOutput.stream().map(Object::toString).collect(Collectors.joining(","))); } private String parseDatum(ColumnInfo info, Datum datum) { if (datum.isNullValue() != null && datum.isNullValue()) { return info.getName() + "=" + "NULL"; } Type columnType = info.getType(); // If the column is of TimeSeries Type if (columnType.getTimeSeriesMeasureValueColumnInfo() != null) { return parseTimeSeries(info, datum); } // If the column is of Array Type else if (columnType.getArrayColumnInfo() != null) { List<Datum> arrayValues = datum.getArrayValue(); return info.getName() + "=" + parseArray(info.getType().getArrayColumnInfo(), arrayValues); } // If the column is of Row Type else if (columnType.getRowColumnInfo() != null) { List<ColumnInfo> rowColumnInfo = info.getType().getRowColumnInfo(); Row rowValues = datum.getRowValue(); return parseRow(rowColumnInfo, rowValues); } // If the column is of Scalar Type else { return parseScalarType(info, datum); } } private String parseTimeSeries(ColumnInfo info, Datum datum) { List<String> timeSeriesOutput = new ArrayList<>(); for (TimeSeriesDataPoint dataPoint : datum.getTimeSeriesValue()) { timeSeriesOutput.add("{time=" + dataPoint.getTime() + ", value=" + parseDatum(info.getType().getTimeSeriesMeasureValueColumnInfo(), dataPoint.getValue()) + "}"); } return String.format("[%s]", timeSeriesOutput.stream().map(Object::toString).collect(Collectors.joining(","))); } private String parseScalarType(ColumnInfo info, Datum datum) { switch (ScalarType.fromValue(info.getType().getScalarType())) { case VARCHAR: return parseColumnName(info) + datum.getScalarValue(); case BIGINT: Long longValue = Long.valueOf(datum.getScalarValue()); return parseColumnName(info) + longValue; case INTEGER: Integer intValue = Integer.valueOf(datum.getScalarValue()); return parseColumnName(info) + intValue; case BOOLEAN: Boolean booleanValue = Boolean.valueOf(datum.getScalarValue()); return parseColumnName(info) + booleanValue; case DOUBLE: Double doubleValue = Double.valueOf(datum.getScalarValue()); return parseColumnName(info) + doubleValue; case TIMESTAMP: return parseColumnName(info) + LocalDateTime.parse(datum.getScalarValue(), TIMESTAMP_FORMATTER); case DATE: return parseColumnName(info) + LocalDate.parse(datum.getScalarValue(), DATE_FORMATTER); case TIME: return parseColumnName(info) + LocalTime.parse(datum.getScalarValue(), TIME_FORMATTER); case INTERVAL_DAY_TO_SECOND: case INTERVAL_YEAR_TO_MONTH: return parseColumnName(info) + datum.getScalarValue(); case UNKNOWN: return parseColumnName(info) + datum.getScalarValue(); default: throw new IllegalArgumentException("Given type is not valid: " + info.getType().getScalarType()); } } private String parseColumnName(ColumnInfo info) { return info.getName() == null ? "" : info.getName() + "="; } private String parseArray(ColumnInfo arrayColumnInfo, List<Datum> arrayValues) { List<String> arrayOutput = new ArrayList<>(); for (Datum datum : arrayValues) { arrayOutput.add(parseDatum(arrayColumnInfo, datum)); } return String.format("[%s]", arrayOutput.stream().map(Object::toString).collect(Collectors.joining(","))); }
Java v2
private static final long ONE_GB_IN_BYTES = 1073741824L; private void parseQueryResult(QueryResponse response) { final QueryStatus currentStatusOfQuery = response.queryStatus(); System.out.println("Query progress so far: " + currentStatusOfQuery.progressPercentage() + "%"); double bytesScannedSoFar = ((double) currentStatusOfQuery.cumulativeBytesScanned() / ONE_GB_IN_BYTES); System.out.println("Bytes scanned so far: " + bytesScannedSoFar + " GB"); double bytesMeteredSoFar = ((double) currentStatusOfQuery.cumulativeBytesMetered() / ONE_GB_IN_BYTES); System.out.println("Bytes metered so far: " + bytesMeteredSoFar + " GB"); List<ColumnInfo> columnInfo = response.columnInfo(); List<Row> rows = response.rows(); System.out.println("Metadata: " + columnInfo); System.out.println("Data: "); // iterate every row for (Row row : rows) { System.out.println(parseRow(columnInfo, row)); } } private String parseRow(List<ColumnInfo> columnInfo, Row row) { List<Datum> data = row.data(); List<String> rowOutput = new ArrayList<>(); // iterate every column per row for (int j = 0; j < data.size(); j++) { ColumnInfo info = columnInfo.get(j); Datum datum = data.get(j); rowOutput.add(parseDatum(info, datum)); } return String.format("{%s}", rowOutput.stream().map(Object::toString).collect(Collectors.joining(","))); } private String parseDatum(ColumnInfo info, Datum datum) { if (datum.nullValue() != null && datum.nullValue()) { return info.name() + "=" + "NULL"; } Type columnType = info.type(); // If the column is of TimeSeries Type if (columnType.timeSeriesMeasureValueColumnInfo() != null) { return parseTimeSeries(info, datum); } // If the column is of Array Type else if (columnType.arrayColumnInfo() != null) { List<Datum> arrayValues = datum.arrayValue(); return info.name() + "=" + parseArray(info.type().arrayColumnInfo(), arrayValues); } // If the column is of Row Type else if (columnType.rowColumnInfo() != null && columnType.rowColumnInfo().size() > 0) { List<ColumnInfo> rowColumnInfo = info.type().rowColumnInfo(); Row rowValues = datum.rowValue(); return parseRow(rowColumnInfo, rowValues); } // If the column is of Scalar Type else { return parseScalarType(info, datum); } } private String parseTimeSeries(ColumnInfo info, Datum datum) { List<String> timeSeriesOutput = new ArrayList<>(); for (TimeSeriesDataPoint dataPoint : datum.timeSeriesValue()) { timeSeriesOutput.add("{time=" + dataPoint.time() + ", value=" + parseDatum(info.type().timeSeriesMeasureValueColumnInfo(), dataPoint.value()) + "}"); } return String.format("[%s]", timeSeriesOutput.stream().map(Object::toString).collect(Collectors.joining(","))); } private String parseScalarType(ColumnInfo info, Datum datum) { return parseColumnName(info) + datum.scalarValue(); } private String parseColumnName(ColumnInfo info) { return info.name() == null ? "" : info.name() + "="; } private String parseArray(ColumnInfo arrayColumnInfo, List<Datum> arrayValues) { List<String> arrayOutput = new ArrayList<>(); for (Datum datum : arrayValues) { arrayOutput.add(parseDatum(arrayColumnInfo, datum)); } return String.format("[%s]", arrayOutput.stream().map(Object::toString).collect(Collectors.joining(","))); }
Go
func processScalarType(data *timestreamquery.Datum) string { return *data.ScalarValue } func processTimeSeriesType(data []*timestreamquery.TimeSeriesDataPoint, columnInfo *timestreamquery.ColumnInfo) string { value := "" for k := 0; k < len(data); k++ { time := data[k].Time value += *time + ":" if columnInfo.Type.ScalarType != nil { value += processScalarType(data[k].Value) } else if columnInfo.Type.ArrayColumnInfo != nil { value += processArrayType(data[k].Value.ArrayValue, columnInfo.Type.ArrayColumnInfo) } else if columnInfo.Type.RowColumnInfo != nil { value += processRowType(data[k].Value.RowValue.Data, columnInfo.Type.RowColumnInfo) } else { fail("Bad data type") } if k != len(data)-1 { value += ", " } } return value } func processArrayType(datumList []*timestreamquery.Datum, columnInfo *timestreamquery.ColumnInfo) string { value := "" for k := 0; k < len(datumList); k++ { if columnInfo.Type.ScalarType != nil { value += processScalarType(datumList[k]) } else if columnInfo.Type.TimeSeriesMeasureValueColumnInfo != nil { value += processTimeSeriesType(datumList[k].TimeSeriesValue, columnInfo.Type.TimeSeriesMeasureValueColumnInfo) } else if columnInfo.Type.ArrayColumnInfo != nil { value += "[" value += processArrayType(datumList[k].ArrayValue, columnInfo.Type.ArrayColumnInfo) value += "]" } else if columnInfo.Type.RowColumnInfo != nil { value += "[" value += processRowType(datumList[k].RowValue.Data, columnInfo.Type.RowColumnInfo) value += "]" } else { fail("Bad column type") } if k != len(datumList)-1 { value += ", " } } return value } func processRowType(data []*timestreamquery.Datum, metadata []*timestreamquery.ColumnInfo) string { value := "" for j := 0; j < len(data); j++ { if metadata[j].Type.ScalarType != nil { // process simple data types value += processScalarType(data[j]) } else if metadata[j].Type.TimeSeriesMeasureValueColumnInfo != nil { // fmt.Println("Timeseries measure value column info") // fmt.Println(metadata[j].Type.TimeSeriesMeasureValueColumnInfo.Type) datapointList := data[j].TimeSeriesValue value += "[" value += processTimeSeriesType(datapointList, metadata[j].Type.TimeSeriesMeasureValueColumnInfo) value += "]" } else if metadata[j].Type.ArrayColumnInfo != nil { columnInfo := metadata[j].Type.ArrayColumnInfo // fmt.Println("Array column info") // fmt.Println(columnInfo) datumList := data[j].ArrayValue value += "[" value += processArrayType(datumList, columnInfo) value += "]" } else if metadata[j].Type.RowColumnInfo != nil { columnInfo := metadata[j].Type.RowColumnInfo datumList := data[j].RowValue.Data value += "[" value += processRowType(datumList, columnInfo) value += "]" } else { panic("Bad column type") } // comma seperated column values if j != len(data)-1 { value += ", " } } return value }
Python
def _parse_query_result(self, query_result): query_status = query_result["QueryStatus"] progress_percentage = query_status["ProgressPercentage"] print(f"Query progress so far: {progress_percentage}%") bytes_scanned = float(query_status["CumulativeBytesScanned"]) / ONE_GB_IN_BYTES print(f"Data scanned so far: {bytes_scanned} GB") bytes_metered = float(query_status["CumulativeBytesMetered"]) / ONE_GB_IN_BYTES print(f"Data metered so far: {bytes_metered} GB") column_info = query_result['ColumnInfo'] print("Metadata: %s" % column_info) print("Data: ") for row in query_result['Rows']: print(self._parse_row(column_info, row)) def _parse_row(self, column_info, row): data = row['Data'] row_output = [] for j in range(len(data)): info = column_info[j] datum = data[j] row_output.append(self._parse_datum(info, datum)) return "{%s}" % str(row_output) def _parse_datum(self, info, datum): if datum.get('NullValue', False): return "%s=NULL" % info['Name'], column_type = info['Type'] # If the column is of TimeSeries Type if 'TimeSeriesMeasureValueColumnInfo' in column_type: return self._parse_time_series(info, datum) # If the column is of Array Type elif 'ArrayColumnInfo' in column_type: array_values = datum['ArrayValue'] return "%s=%s" % (info['Name'], self._parse_array(info['Type']['ArrayColumnInfo'], array_values)) # If the column is of Row Type elif 'RowColumnInfo' in column_type: row_column_info = info['Type']['RowColumnInfo'] row_values = datum['RowValue'] return self._parse_row(row_column_info, row_values) # If the column is of Scalar Type else: return self._parse_column_name(info) + datum['ScalarValue'] def _parse_time_series(self, info, datum): time_series_output = [] for data_point in datum['TimeSeriesValue']: time_series_output.append("{time=%s, value=%s}" % (data_point['Time'], self._parse_datum(info['Type']['TimeSeriesMeasureValueColumnInfo'], data_point['Value']))) return "[%s]" % str(time_series_output) def _parse_array(self, array_column_info, array_values): array_output = [] for datum in array_values: array_output.append(self._parse_datum(array_column_info, datum)) return "[%s]" % str(array_output) @staticmethod def _parse_column_name(info): if 'Name' in info: return info['Name'] + "=" else: return ""
Node.js

以下代码段使用 for JavaScript V2 样式。 AWS SDK它基于 Node.js 示例 Amazon Timestream 中的示例 LiveAnalytics 应用程序,供其上使用。 GitHub

function parseQueryResult(response) { const queryStatus = response.QueryStatus; console.log("Current query status: " + JSON.stringify(queryStatus)); const columnInfo = response.ColumnInfo; const rows = response.Rows; console.log("Metadata: " + JSON.stringify(columnInfo)); console.log("Data: "); rows.forEach(function (row) { console.log(parseRow(columnInfo, row)); }); } function parseRow(columnInfo, row) { const data = row.Data; const rowOutput = []; var i; for ( i = 0; i < data.length; i++ ) { info = columnInfo[i]; datum = data[i]; rowOutput.push(parseDatum(info, datum)); } return `{${rowOutput.join(", ")}}` } function parseDatum(info, datum) { if (datum.NullValue != null && datum.NullValue === true) { return `${info.Name}=NULL`; } const columnType = info.Type; // If the column is of TimeSeries Type if (columnType.TimeSeriesMeasureValueColumnInfo != null) { return parseTimeSeries(info, datum); } // If the column is of Array Type else if (columnType.ArrayColumnInfo != null) { const arrayValues = datum.ArrayValue; return `${info.Name}=${parseArray(info.Type.ArrayColumnInfo, arrayValues)}`; } // If the column is of Row Type else if (columnType.RowColumnInfo != null) { const rowColumnInfo = info.Type.RowColumnInfo; const rowValues = datum.RowValue; return parseRow(rowColumnInfo, rowValues); } // If the column is of Scalar Type else { return parseScalarType(info, datum); } } function parseTimeSeries(info, datum) { const timeSeriesOutput = []; datum.TimeSeriesValue.forEach(function (dataPoint) { timeSeriesOutput.push(`{time=${dataPoint.Time}, value=${parseDatum(info.Type.TimeSeriesMeasureValueColumnInfo, dataPoint.Value)}}`) }); return `[${timeSeriesOutput.join(", ")}]` } function parseScalarType(info, datum) { return parseColumnName(info) + datum.ScalarValue; } function parseColumnName(info) { return info.Name == null ? "" : `${info.Name}=`; } function parseArray(arrayColumnInfo, arrayValues) { const arrayOutput = []; arrayValues.forEach(function (datum) { arrayOutput.push(parseDatum(arrayColumnInfo, datum)); }); return `[${arrayOutput.join(", ")}]` }
.NET
private void ParseQueryResult(QueryResponse response) { List<ColumnInfo> columnInfo = response.ColumnInfo; var options = new JsonSerializerOptions { IgnoreNullValues = true }; List<String> columnInfoStrings = columnInfo.ConvertAll(x => JsonSerializer.Serialize(x, options)); List<Row> rows = response.Rows; QueryStatus queryStatus = response.QueryStatus; Console.WriteLine("Current Query status:" + JsonSerializer.Serialize(queryStatus, options)); Console.WriteLine("Metadata:" + string.Join(",", columnInfoStrings)); Console.WriteLine("Data:"); foreach (Row row in rows) { Console.WriteLine(ParseRow(columnInfo, row)); } } private string ParseRow(List<ColumnInfo> columnInfo, Row row) { List<Datum> data = row.Data; List<string> rowOutput = new List<string>(); for (int j = 0; j < data.Count; j++) { ColumnInfo info = columnInfo[j]; Datum datum = data[j]; rowOutput.Add(ParseDatum(info, datum)); } return $"{{{string.Join(",", rowOutput)}}}"; } private string ParseDatum(ColumnInfo info, Datum datum) { if (datum.NullValue) { return $"{info.Name}=NULL"; } Amazon.TimestreamQuery.Model.Type columnType = info.Type; if (columnType.TimeSeriesMeasureValueColumnInfo != null) { return ParseTimeSeries(info, datum); } else if (columnType.ArrayColumnInfo != null) { List<Datum> arrayValues = datum.ArrayValue; return $"{info.Name}={ParseArray(info.Type.ArrayColumnInfo, arrayValues)}"; } else if (columnType.RowColumnInfo != null && columnType.RowColumnInfo.Count > 0) { List<ColumnInfo> rowColumnInfo = info.Type.RowColumnInfo; Row rowValue = datum.RowValue; return ParseRow(rowColumnInfo, rowValue); } else { return ParseScalarType(info, datum); } } private string ParseTimeSeries(ColumnInfo info, Datum datum) { var timeseriesString = datum.TimeSeriesValue .Select(value => $"{{time={value.Time}, value={ParseDatum(info.Type.TimeSeriesMeasureValueColumnInfo, value.Value)}}}") .Aggregate((current, next) => current + "," + next); return $"[{timeseriesString}]"; } private string ParseScalarType(ColumnInfo info, Datum datum) { return ParseColumnName(info) + datum.ScalarValue; } private string ParseColumnName(ColumnInfo info) { return info.Name == null ? "" : (info.Name + "="); } private string ParseArray(ColumnInfo arrayColumnInfo, List<Datum> arrayValues) { return $"[{arrayValues.Select(value => ParseDatum(arrayColumnInfo, value)).Aggregate((current, next) => current + "," + next)}]"; }

访问查询状态

您可以通过访问查询状态QueryResponse,其中包含有关查询进度、查询扫描的字节和查询计量的字节的信息。bytesMeteredbytesScanned值是累积的,在分页查询结果时会持续更新。您可以使用此信息来了解单个查询所扫描的字节,也可以使用它来做出某些决策。例如,假设查询价格为每扫描 GB 0.01 美元,则可能需要取消每次查询超过 25 美元或 X GB 的查询。下面的代码片段显示了如何做到这一点。

注意

这些代码片段基于上的完整示例应用程序。GitHub有关如何开始使用示例应用程序的更多信息,请参阅示例应用程序

Java
private static final long ONE_GB_IN_BYTES = 1073741824L; private static final double QUERY_COST_PER_GB_IN_DOLLARS = 0.01; // Assuming the price of query is $0.01 per GB public void cancelQueryBasedOnQueryStatus() { System.out.println("Starting query: " + SELECT_ALL_QUERY); QueryRequest queryRequest = new QueryRequest(); queryRequest.setQueryString(SELECT_ALL_QUERY); QueryResult queryResult = queryClient.query(queryRequest); while (true) { final QueryStatus currentStatusOfQuery = queryResult.getQueryStatus(); System.out.println("Query progress so far: " + currentStatusOfQuery.getProgressPercentage() + "%"); double bytesMeteredSoFar = ((double) currentStatusOfQuery.getCumulativeBytesMetered() / ONE_GB_IN_BYTES); System.out.println("Bytes metered so far: " + bytesMeteredSoFar + " GB"); // Cancel query if its costing more than 1 cent if (bytesMeteredSoFar * QUERY_COST_PER_GB_IN_DOLLARS > 0.01) { cancelQuery(queryResult); break; } if (queryResult.getNextToken() == null) { break; } queryRequest.setNextToken(queryResult.getNextToken()); queryResult = queryClient.query(queryRequest); } }
Java v2
private static final long ONE_GB_IN_BYTES = 1073741824L; private static final double QUERY_COST_PER_GB_IN_DOLLARS = 0.01; // Assuming the price of query is $0.01 per GB public void cancelQueryBasedOnQueryStatus() { System.out.println("Starting query: " + SELECT_ALL_QUERY); QueryRequest queryRequest = QueryRequest.builder().queryString(SELECT_ALL_QUERY).build(); final QueryIterable queryResponseIterator = timestreamQueryClient.queryPaginator(queryRequest); for(QueryResponse queryResponse : queryResponseIterator) { final QueryStatus currentStatusOfQuery = queryResponse.queryStatus(); System.out.println("Query progress so far: " + currentStatusOfQuery.progressPercentage() + "%"); double bytesMeteredSoFar = ((double) currentStatusOfQuery.cumulativeBytesMetered() / ONE_GB_IN_BYTES); System.out.println("Bytes metered so far: " + bytesMeteredSoFar + "GB"); // Cancel query if its costing more than 1 cent if (bytesMeteredSoFar * QUERY_COST_PER_GB_IN_DOLLARS > 0.01) { cancelQuery(queryResponse); break; } } }
Go
const OneGbInBytes = 1073741824 // Assuming the price of query is $0.01 per GB const QueryCostPerGbInDollars = 0.01 func cancelQueryBasedOnQueryStatus(queryPtr *string, querySvc *timestreamquery.TimestreamQuery, f *os.File) { queryInput := &timestreamquery.QueryInput{ QueryString: aws.String(*queryPtr), } fmt.Println("QueryInput:") fmt.Println(queryInput) // execute the query err := querySvc.QueryPages(queryInput, func(page *timestreamquery.QueryOutput, lastPage bool) bool { // process query response queryStatus := page.QueryStatus fmt.Println("Current query status:", queryStatus) bytes_metered := float64(*queryStatus.CumulativeBytesMetered) / float64(ONE_GB_IN_BYTES) if bytes_metered * QUERY_COST_PER_GB_IN_DOLLARS > 0.01 { cancelQuery(page, querySvc) return true } // query response metadata // includes column names and types metadata := page.ColumnInfo // fmt.Println("Metadata:") fmt.Println(metadata) header := "" for i := 0; i < len(metadata); i++ { header += *metadata[i].Name if i != len(metadata)-1 { header += ", " } } write(f, header) // query response data fmt.Println("Data:") // process rows rows := page.Rows for i := 0; i < len(rows); i++ { data := rows[i].Data value := processRowType(data, metadata) fmt.Println(value) write(f, value) } fmt.Println("Number of rows:", len(page.Rows)) return true }) if err != nil { fmt.Println("Error:") fmt.Println(err) } }
Python
ONE_GB_IN_BYTES = 1073741824 # Assuming the price of query is $0.01 per GB QUERY_COST_PER_GB_IN_DOLLARS = 0.01 def cancel_query_based_on_query_status(self): try: print("Starting query: " + self.SELECT_ALL) page_iterator = self.paginator.paginate(QueryString=self.SELECT_ALL) for page in page_iterator: query_status = page["QueryStatus"] progress_percentage = query_status["ProgressPercentage"] print("Query progress so far: " + str(progress_percentage) + "%") bytes_metered = query_status["CumulativeBytesMetered"] / self.ONE_GB_IN_BYTES print("Bytes Metered so far: " + str(bytes_metered) + " GB") if bytes_metered * self.QUERY_COST_PER_GB_IN_DOLLARS > 0.01: self.cancel_query_for(page) break except Exception as err: print("Exception while running query:", err) traceback.print_exc(file=sys.stderr)
Node.js

以下代码段使用 for JavaScript V2 样式。 AWS SDK它基于 Node.js 示例 Amazon Timestream 中的示例 LiveAnalytics 应用程序,供其上使用。 GitHub

function parseQueryResult(response) { const queryStatus = response.QueryStatus; console.log("Current query status: " + JSON.stringify(queryStatus)); const columnInfo = response.ColumnInfo; const rows = response.Rows; console.log("Metadata: " + JSON.stringify(columnInfo)); console.log("Data: "); rows.forEach(function (row) { console.log(parseRow(columnInfo, row)); }); } function parseRow(columnInfo, row) { const data = row.Data; const rowOutput = []; var i; for ( i = 0; i < data.length; i++ ) { info = columnInfo[i]; datum = data[i]; rowOutput.push(parseDatum(info, datum)); } return `{${rowOutput.join(", ")}}` } function parseDatum(info, datum) { if (datum.NullValue != null && datum.NullValue === true) { return `${info.Name}=NULL`; } const columnType = info.Type; // If the column is of TimeSeries Type if (columnType.TimeSeriesMeasureValueColumnInfo != null) { return parseTimeSeries(info, datum); } // If the column is of Array Type else if (columnType.ArrayColumnInfo != null) { const arrayValues = datum.ArrayValue; return `${info.Name}=${parseArray(info.Type.ArrayColumnInfo, arrayValues)}`; } // If the column is of Row Type else if (columnType.RowColumnInfo != null) { const rowColumnInfo = info.Type.RowColumnInfo; const rowValues = datum.RowValue; return parseRow(rowColumnInfo, rowValues); } // If the column is of Scalar Type else { return parseScalarType(info, datum); } } function parseTimeSeries(info, datum) { const timeSeriesOutput = []; datum.TimeSeriesValue.forEach(function (dataPoint) { timeSeriesOutput.push(`{time=${dataPoint.Time}, value=${parseDatum(info.Type.TimeSeriesMeasureValueColumnInfo, dataPoint.Value)}}`) }); return `[${timeSeriesOutput.join(", ")}]` } function parseScalarType(info, datum) { return parseColumnName(info) + datum.ScalarValue; } function parseColumnName(info) { return info.Name == null ? "" : `${info.Name}=`; } function parseArray(arrayColumnInfo, arrayValues) { const arrayOutput = []; arrayValues.forEach(function (datum) { arrayOutput.push(parseDatum(arrayColumnInfo, datum)); }); return `[${arrayOutput.join(", ")}]` }
.NET
private static readonly long ONE_GB_IN_BYTES = 1073741824L; private static readonly double QUERY_COST_PER_GB_IN_DOLLARS = 0.01; // Assuming the price of query is $0.01 per GB private async Task CancelQueryBasedOnQueryStatus(string queryString) { try { QueryRequest queryRequest = new QueryRequest(); queryRequest.QueryString = queryString; QueryResponse queryResponse = await queryClient.QueryAsync(queryRequest); while (true) { QueryStatus queryStatus = queryResponse.QueryStatus; double bytesMeteredSoFar = ((double) queryStatus.CumulativeBytesMetered / ONE_GB_IN_BYTES); // Cancel query if its costing more than 1 cent if (bytesMeteredSoFar * QUERY_COST_PER_GB_IN_DOLLARS > 0.01) { await CancelQuery(queryResponse); break; } ParseQueryResult(queryResponse); if (queryResponse.NextToken == null) { break; } queryRequest.NextToken = queryResponse.NextToken; queryResponse = await queryClient.QueryAsync(queryRequest); } } catch(Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries Console.WriteLine(e.ToString()); } }

有关如何取消查询的更多详细信息,请参阅取消查询