Exécuter la requête - Amazon Timestream

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Exécuter la requête

Pagination des résultats

Lorsque vous exécutez une requête, Timestream renvoie le jeu de résultats de manière paginée afin d'optimiser la réactivité de vos applications. L'extrait de code ci-dessous montre comment vous pouvez paginer dans le jeu de résultats. Vous devez parcourir toutes les pages du jeu de résultats jusqu'à ce que vous trouviez une valeur nulle. Les jetons de pagination expirent 3 heures après avoir été émis par Timestream pour. LiveAnalytics

Note

Ces extraits de code sont basés sur des exemples complets d'applications sur. GitHub Pour plus d'informations sur la façon de démarrer avec les exemples d'applications, consultezExemple d'application.

Java
private void runQuery(String queryString) { try { QueryRequest queryRequest = new QueryRequest(); queryRequest.setQueryString(queryString); QueryResult queryResult = queryClient.query(queryRequest); while (true) { parseQueryResult(queryResult); if (queryResult.getNextToken() == null) { break; } queryRequest.setNextToken(queryResult.getNextToken()); queryResult = queryClient.query(queryRequest); } } catch (Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries e.printStackTrace(); } }
Java v2
private void runQuery(String queryString) { try { QueryRequest queryRequest = QueryRequest.builder().queryString(queryString).build(); final QueryIterable queryResponseIterator = timestreamQueryClient.queryPaginator(queryRequest); for(QueryResponse queryResponse : queryResponseIterator) { parseQueryResult(queryResponse); } } catch (Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries e.printStackTrace(); } }
Go
func runQuery(queryPtr *string, querySvc *timestreamquery.TimestreamQuery, f *os.File) { queryInput := &timestreamquery.QueryInput{ QueryString: aws.String(*queryPtr), } fmt.Println("QueryInput:") fmt.Println(queryInput) // execute the query err := querySvc.QueryPages(queryInput, func(page *timestreamquery.QueryOutput, lastPage bool) bool { // process query response queryStatus := page.QueryStatus fmt.Println("Current query status:", queryStatus) // query response metadata // includes column names and types metadata := page.ColumnInfo // fmt.Println("Metadata:") fmt.Println(metadata) header := "" for i := 0; i < len(metadata); i++ { header += *metadata[i].Name if i != len(metadata)-1 { header += ", " } } write(f, header) // query response data fmt.Println("Data:") // process rows rows := page.Rows for i := 0; i < len(rows); i++ { data := rows[i].Data value := processRowType(data, metadata) fmt.Println(value) write(f, value) } fmt.Println("Number of rows:", len(page.Rows)) return true }) if err != nil { fmt.Println("Error:") fmt.Println(err) } }
Python
def run_query(self, query_string): try: page_iterator = self.paginator.paginate(QueryString=query_string) for page in page_iterator: self._parse_query_result(page) except Exception as err: print("Exception while running query:", err)
Node.js

L'extrait suivant utilise le style AWS SDK for JavaScript V2. Il est basé sur l'exemple d'application disponible sur Node.js, un exemple d'Amazon Timestream LiveAnalytics pour une application sur. GitHub

async function getAllRows(query, nextToken) { const params = { QueryString: query }; if (nextToken) { params.NextToken = nextToken; } await queryClient.query(params).promise() .then( (response) => { parseQueryResult(response); if (response.NextToken) { getAllRows(query, response.NextToken); } }, (err) => { console.error("Error while querying:", err); }); }
.NET
private async Task RunQueryAsync(string queryString) { try { QueryRequest queryRequest = new QueryRequest(); queryRequest.QueryString = queryString; QueryResponse queryResponse = await queryClient.QueryAsync(queryRequest); while (true) { ParseQueryResult(queryResponse); if (queryResponse.NextToken == null) { break; } queryRequest.NextToken = queryResponse.NextToken; queryResponse = await queryClient.QueryAsync(queryRequest); } } catch(Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries Console.WriteLine(e.ToString()); } }

Analyse des ensembles de résultats

Vous pouvez utiliser les extraits de code suivants pour extraire des données du jeu de résultats. Les résultats des requêtes sont accessibles jusqu'à 24 heures après la fin de la requête.

Note

Ces extraits de code sont basés sur des exemples complets d'applications sur. GitHub Pour plus d'informations sur la façon de démarrer avec les exemples d'applications, consultezExemple d'application.

Java
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS"); private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSSSSS"); private static final long ONE_GB_IN_BYTES = 1073741824L; private void parseQueryResult(QueryResult response) { final QueryStatus currentStatusOfQuery = queryResult.getQueryStatus(); System.out.println("Query progress so far: " + currentStatusOfQuery.getProgressPercentage() + "%"); double bytesScannedSoFar = ((double) currentStatusOfQuery.getCumulativeBytesScanned() / ONE_GB_IN_BYTES); System.out.println("Bytes scanned so far: " + bytesScannedSoFar + " GB"); double bytesMeteredSoFar = ((double) currentStatusOfQuery.getCumulativeBytesMetered() / ONE_GB_IN_BYTES); System.out.println("Bytes metered so far: " + bytesMeteredSoFar + " GB"); List<ColumnInfo> columnInfo = response.getColumnInfo(); List<Row> rows = response.getRows(); System.out.println("Metadata: " + columnInfo); System.out.println("Data: "); // iterate every row for (Row row : rows) { System.out.println(parseRow(columnInfo, row)); } } private String parseRow(List<ColumnInfo> columnInfo, Row row) { List<Datum> data = row.getData(); List<String> rowOutput = new ArrayList<>(); // iterate every column per row for (int j = 0; j < data.size(); j++) { ColumnInfo info = columnInfo.get(j); Datum datum = data.get(j); rowOutput.add(parseDatum(info, datum)); } return String.format("{%s}", rowOutput.stream().map(Object::toString).collect(Collectors.joining(","))); } private String parseDatum(ColumnInfo info, Datum datum) { if (datum.isNullValue() != null && datum.isNullValue()) { return info.getName() + "=" + "NULL"; } Type columnType = info.getType(); // If the column is of TimeSeries Type if (columnType.getTimeSeriesMeasureValueColumnInfo() != null) { return parseTimeSeries(info, datum); } // If the column is of Array Type else if (columnType.getArrayColumnInfo() != null) { List<Datum> arrayValues = datum.getArrayValue(); return info.getName() + "=" + parseArray(info.getType().getArrayColumnInfo(), arrayValues); } // If the column is of Row Type else if (columnType.getRowColumnInfo() != null) { List<ColumnInfo> rowColumnInfo = info.getType().getRowColumnInfo(); Row rowValues = datum.getRowValue(); return parseRow(rowColumnInfo, rowValues); } // If the column is of Scalar Type else { return parseScalarType(info, datum); } } private String parseTimeSeries(ColumnInfo info, Datum datum) { List<String> timeSeriesOutput = new ArrayList<>(); for (TimeSeriesDataPoint dataPoint : datum.getTimeSeriesValue()) { timeSeriesOutput.add("{time=" + dataPoint.getTime() + ", value=" + parseDatum(info.getType().getTimeSeriesMeasureValueColumnInfo(), dataPoint.getValue()) + "}"); } return String.format("[%s]", timeSeriesOutput.stream().map(Object::toString).collect(Collectors.joining(","))); } private String parseScalarType(ColumnInfo info, Datum datum) { switch (ScalarType.fromValue(info.getType().getScalarType())) { case VARCHAR: return parseColumnName(info) + datum.getScalarValue(); case BIGINT: Long longValue = Long.valueOf(datum.getScalarValue()); return parseColumnName(info) + longValue; case INTEGER: Integer intValue = Integer.valueOf(datum.getScalarValue()); return parseColumnName(info) + intValue; case BOOLEAN: Boolean booleanValue = Boolean.valueOf(datum.getScalarValue()); return parseColumnName(info) + booleanValue; case DOUBLE: Double doubleValue = Double.valueOf(datum.getScalarValue()); return parseColumnName(info) + doubleValue; case TIMESTAMP: return parseColumnName(info) + LocalDateTime.parse(datum.getScalarValue(), TIMESTAMP_FORMATTER); case DATE: return parseColumnName(info) + LocalDate.parse(datum.getScalarValue(), DATE_FORMATTER); case TIME: return parseColumnName(info) + LocalTime.parse(datum.getScalarValue(), TIME_FORMATTER); case INTERVAL_DAY_TO_SECOND: case INTERVAL_YEAR_TO_MONTH: return parseColumnName(info) + datum.getScalarValue(); case UNKNOWN: return parseColumnName(info) + datum.getScalarValue(); default: throw new IllegalArgumentException("Given type is not valid: " + info.getType().getScalarType()); } } private String parseColumnName(ColumnInfo info) { return info.getName() == null ? "" : info.getName() + "="; } private String parseArray(ColumnInfo arrayColumnInfo, List<Datum> arrayValues) { List<String> arrayOutput = new ArrayList<>(); for (Datum datum : arrayValues) { arrayOutput.add(parseDatum(arrayColumnInfo, datum)); } return String.format("[%s]", arrayOutput.stream().map(Object::toString).collect(Collectors.joining(","))); }
Java v2
private static final long ONE_GB_IN_BYTES = 1073741824L; private void parseQueryResult(QueryResponse response) { final QueryStatus currentStatusOfQuery = response.queryStatus(); System.out.println("Query progress so far: " + currentStatusOfQuery.progressPercentage() + "%"); double bytesScannedSoFar = ((double) currentStatusOfQuery.cumulativeBytesScanned() / ONE_GB_IN_BYTES); System.out.println("Bytes scanned so far: " + bytesScannedSoFar + " GB"); double bytesMeteredSoFar = ((double) currentStatusOfQuery.cumulativeBytesMetered() / ONE_GB_IN_BYTES); System.out.println("Bytes metered so far: " + bytesMeteredSoFar + " GB"); List<ColumnInfo> columnInfo = response.columnInfo(); List<Row> rows = response.rows(); System.out.println("Metadata: " + columnInfo); System.out.println("Data: "); // iterate every row for (Row row : rows) { System.out.println(parseRow(columnInfo, row)); } } private String parseRow(List<ColumnInfo> columnInfo, Row row) { List<Datum> data = row.data(); List<String> rowOutput = new ArrayList<>(); // iterate every column per row for (int j = 0; j < data.size(); j++) { ColumnInfo info = columnInfo.get(j); Datum datum = data.get(j); rowOutput.add(parseDatum(info, datum)); } return String.format("{%s}", rowOutput.stream().map(Object::toString).collect(Collectors.joining(","))); } private String parseDatum(ColumnInfo info, Datum datum) { if (datum.nullValue() != null && datum.nullValue()) { return info.name() + "=" + "NULL"; } Type columnType = info.type(); // If the column is of TimeSeries Type if (columnType.timeSeriesMeasureValueColumnInfo() != null) { return parseTimeSeries(info, datum); } // If the column is of Array Type else if (columnType.arrayColumnInfo() != null) { List<Datum> arrayValues = datum.arrayValue(); return info.name() + "=" + parseArray(info.type().arrayColumnInfo(), arrayValues); } // If the column is of Row Type else if (columnType.rowColumnInfo() != null && columnType.rowColumnInfo().size() > 0) { List<ColumnInfo> rowColumnInfo = info.type().rowColumnInfo(); Row rowValues = datum.rowValue(); return parseRow(rowColumnInfo, rowValues); } // If the column is of Scalar Type else { return parseScalarType(info, datum); } } private String parseTimeSeries(ColumnInfo info, Datum datum) { List<String> timeSeriesOutput = new ArrayList<>(); for (TimeSeriesDataPoint dataPoint : datum.timeSeriesValue()) { timeSeriesOutput.add("{time=" + dataPoint.time() + ", value=" + parseDatum(info.type().timeSeriesMeasureValueColumnInfo(), dataPoint.value()) + "}"); } return String.format("[%s]", timeSeriesOutput.stream().map(Object::toString).collect(Collectors.joining(","))); } private String parseScalarType(ColumnInfo info, Datum datum) { return parseColumnName(info) + datum.scalarValue(); } private String parseColumnName(ColumnInfo info) { return info.name() == null ? "" : info.name() + "="; } private String parseArray(ColumnInfo arrayColumnInfo, List<Datum> arrayValues) { List<String> arrayOutput = new ArrayList<>(); for (Datum datum : arrayValues) { arrayOutput.add(parseDatum(arrayColumnInfo, datum)); } return String.format("[%s]", arrayOutput.stream().map(Object::toString).collect(Collectors.joining(","))); }
Go
func processScalarType(data *timestreamquery.Datum) string { return *data.ScalarValue } func processTimeSeriesType(data []*timestreamquery.TimeSeriesDataPoint, columnInfo *timestreamquery.ColumnInfo) string { value := "" for k := 0; k < len(data); k++ { time := data[k].Time value += *time + ":" if columnInfo.Type.ScalarType != nil { value += processScalarType(data[k].Value) } else if columnInfo.Type.ArrayColumnInfo != nil { value += processArrayType(data[k].Value.ArrayValue, columnInfo.Type.ArrayColumnInfo) } else if columnInfo.Type.RowColumnInfo != nil { value += processRowType(data[k].Value.RowValue.Data, columnInfo.Type.RowColumnInfo) } else { fail("Bad data type") } if k != len(data)-1 { value += ", " } } return value } func processArrayType(datumList []*timestreamquery.Datum, columnInfo *timestreamquery.ColumnInfo) string { value := "" for k := 0; k < len(datumList); k++ { if columnInfo.Type.ScalarType != nil { value += processScalarType(datumList[k]) } else if columnInfo.Type.TimeSeriesMeasureValueColumnInfo != nil { value += processTimeSeriesType(datumList[k].TimeSeriesValue, columnInfo.Type.TimeSeriesMeasureValueColumnInfo) } else if columnInfo.Type.ArrayColumnInfo != nil { value += "[" value += processArrayType(datumList[k].ArrayValue, columnInfo.Type.ArrayColumnInfo) value += "]" } else if columnInfo.Type.RowColumnInfo != nil { value += "[" value += processRowType(datumList[k].RowValue.Data, columnInfo.Type.RowColumnInfo) value += "]" } else { fail("Bad column type") } if k != len(datumList)-1 { value += ", " } } return value } func processRowType(data []*timestreamquery.Datum, metadata []*timestreamquery.ColumnInfo) string { value := "" for j := 0; j < len(data); j++ { if metadata[j].Type.ScalarType != nil { // process simple data types value += processScalarType(data[j]) } else if metadata[j].Type.TimeSeriesMeasureValueColumnInfo != nil { // fmt.Println("Timeseries measure value column info") // fmt.Println(metadata[j].Type.TimeSeriesMeasureValueColumnInfo.Type) datapointList := data[j].TimeSeriesValue value += "[" value += processTimeSeriesType(datapointList, metadata[j].Type.TimeSeriesMeasureValueColumnInfo) value += "]" } else if metadata[j].Type.ArrayColumnInfo != nil { columnInfo := metadata[j].Type.ArrayColumnInfo // fmt.Println("Array column info") // fmt.Println(columnInfo) datumList := data[j].ArrayValue value += "[" value += processArrayType(datumList, columnInfo) value += "]" } else if metadata[j].Type.RowColumnInfo != nil { columnInfo := metadata[j].Type.RowColumnInfo datumList := data[j].RowValue.Data value += "[" value += processRowType(datumList, columnInfo) value += "]" } else { panic("Bad column type") } // comma seperated column values if j != len(data)-1 { value += ", " } } return value }
Python
def _parse_query_result(self, query_result): query_status = query_result["QueryStatus"] progress_percentage = query_status["ProgressPercentage"] print(f"Query progress so far: {progress_percentage}%") bytes_scanned = float(query_status["CumulativeBytesScanned"]) / ONE_GB_IN_BYTES print(f"Data scanned so far: {bytes_scanned} GB") bytes_metered = float(query_status["CumulativeBytesMetered"]) / ONE_GB_IN_BYTES print(f"Data metered so far: {bytes_metered} GB") column_info = query_result['ColumnInfo'] print("Metadata: %s" % column_info) print("Data: ") for row in query_result['Rows']: print(self._parse_row(column_info, row)) def _parse_row(self, column_info, row): data = row['Data'] row_output = [] for j in range(len(data)): info = column_info[j] datum = data[j] row_output.append(self._parse_datum(info, datum)) return "{%s}" % str(row_output) def _parse_datum(self, info, datum): if datum.get('NullValue', False): return "%s=NULL" % info['Name'], column_type = info['Type'] # If the column is of TimeSeries Type if 'TimeSeriesMeasureValueColumnInfo' in column_type: return self._parse_time_series(info, datum) # If the column is of Array Type elif 'ArrayColumnInfo' in column_type: array_values = datum['ArrayValue'] return "%s=%s" % (info['Name'], self._parse_array(info['Type']['ArrayColumnInfo'], array_values)) # If the column is of Row Type elif 'RowColumnInfo' in column_type: row_column_info = info['Type']['RowColumnInfo'] row_values = datum['RowValue'] return self._parse_row(row_column_info, row_values) # If the column is of Scalar Type else: return self._parse_column_name(info) + datum['ScalarValue'] def _parse_time_series(self, info, datum): time_series_output = [] for data_point in datum['TimeSeriesValue']: time_series_output.append("{time=%s, value=%s}" % (data_point['Time'], self._parse_datum(info['Type']['TimeSeriesMeasureValueColumnInfo'], data_point['Value']))) return "[%s]" % str(time_series_output) def _parse_array(self, array_column_info, array_values): array_output = [] for datum in array_values: array_output.append(self._parse_datum(array_column_info, datum)) return "[%s]" % str(array_output) @staticmethod def _parse_column_name(info): if 'Name' in info: return info['Name'] + "=" else: return ""
Node.js

L'extrait suivant utilise le style AWS SDK for JavaScript V2. Il est basé sur l'exemple d'application disponible sur Node.js, un exemple d'Amazon Timestream LiveAnalytics pour une application sur. GitHub

function parseQueryResult(response) { const queryStatus = response.QueryStatus; console.log("Current query status: " + JSON.stringify(queryStatus)); const columnInfo = response.ColumnInfo; const rows = response.Rows; console.log("Metadata: " + JSON.stringify(columnInfo)); console.log("Data: "); rows.forEach(function (row) { console.log(parseRow(columnInfo, row)); }); } function parseRow(columnInfo, row) { const data = row.Data; const rowOutput = []; var i; for ( i = 0; i < data.length; i++ ) { info = columnInfo[i]; datum = data[i]; rowOutput.push(parseDatum(info, datum)); } return `{${rowOutput.join(", ")}}` } function parseDatum(info, datum) { if (datum.NullValue != null && datum.NullValue === true) { return `${info.Name}=NULL`; } const columnType = info.Type; // If the column is of TimeSeries Type if (columnType.TimeSeriesMeasureValueColumnInfo != null) { return parseTimeSeries(info, datum); } // If the column is of Array Type else if (columnType.ArrayColumnInfo != null) { const arrayValues = datum.ArrayValue; return `${info.Name}=${parseArray(info.Type.ArrayColumnInfo, arrayValues)}`; } // If the column is of Row Type else if (columnType.RowColumnInfo != null) { const rowColumnInfo = info.Type.RowColumnInfo; const rowValues = datum.RowValue; return parseRow(rowColumnInfo, rowValues); } // If the column is of Scalar Type else { return parseScalarType(info, datum); } } function parseTimeSeries(info, datum) { const timeSeriesOutput = []; datum.TimeSeriesValue.forEach(function (dataPoint) { timeSeriesOutput.push(`{time=${dataPoint.Time}, value=${parseDatum(info.Type.TimeSeriesMeasureValueColumnInfo, dataPoint.Value)}}`) }); return `[${timeSeriesOutput.join(", ")}]` } function parseScalarType(info, datum) { return parseColumnName(info) + datum.ScalarValue; } function parseColumnName(info) { return info.Name == null ? "" : `${info.Name}=`; } function parseArray(arrayColumnInfo, arrayValues) { const arrayOutput = []; arrayValues.forEach(function (datum) { arrayOutput.push(parseDatum(arrayColumnInfo, datum)); }); return `[${arrayOutput.join(", ")}]` }
.NET
private void ParseQueryResult(QueryResponse response) { List<ColumnInfo> columnInfo = response.ColumnInfo; var options = new JsonSerializerOptions { IgnoreNullValues = true }; List<String> columnInfoStrings = columnInfo.ConvertAll(x => JsonSerializer.Serialize(x, options)); List<Row> rows = response.Rows; QueryStatus queryStatus = response.QueryStatus; Console.WriteLine("Current Query status:" + JsonSerializer.Serialize(queryStatus, options)); Console.WriteLine("Metadata:" + string.Join(",", columnInfoStrings)); Console.WriteLine("Data:"); foreach (Row row in rows) { Console.WriteLine(ParseRow(columnInfo, row)); } } private string ParseRow(List<ColumnInfo> columnInfo, Row row) { List<Datum> data = row.Data; List<string> rowOutput = new List<string>(); for (int j = 0; j < data.Count; j++) { ColumnInfo info = columnInfo[j]; Datum datum = data[j]; rowOutput.Add(ParseDatum(info, datum)); } return $"{{{string.Join(",", rowOutput)}}}"; } private string ParseDatum(ColumnInfo info, Datum datum) { if (datum.NullValue) { return $"{info.Name}=NULL"; } Amazon.TimestreamQuery.Model.Type columnType = info.Type; if (columnType.TimeSeriesMeasureValueColumnInfo != null) { return ParseTimeSeries(info, datum); } else if (columnType.ArrayColumnInfo != null) { List<Datum> arrayValues = datum.ArrayValue; return $"{info.Name}={ParseArray(info.Type.ArrayColumnInfo, arrayValues)}"; } else if (columnType.RowColumnInfo != null && columnType.RowColumnInfo.Count > 0) { List<ColumnInfo> rowColumnInfo = info.Type.RowColumnInfo; Row rowValue = datum.RowValue; return ParseRow(rowColumnInfo, rowValue); } else { return ParseScalarType(info, datum); } } private string ParseTimeSeries(ColumnInfo info, Datum datum) { var timeseriesString = datum.TimeSeriesValue .Select(value => $"{{time={value.Time}, value={ParseDatum(info.Type.TimeSeriesMeasureValueColumnInfo, value.Value)}}}") .Aggregate((current, next) => current + "," + next); return $"[{timeseriesString}]"; } private string ParseScalarType(ColumnInfo info, Datum datum) { return ParseColumnName(info) + datum.ScalarValue; } private string ParseColumnName(ColumnInfo info) { return info.Name == null ? "" : (info.Name + "="); } private string ParseArray(ColumnInfo arrayColumnInfo, List<Datum> arrayValues) { return $"[{arrayValues.Select(value => ParseDatum(arrayColumnInfo, value)).Aggregate((current, next) => current + "," + next)}]"; }

Accès à l'état de la requête

Vous pouvez accéder au statut de la requête viaQueryResponse, qui contient des informations sur la progression d'une requête, les octets analysés par une requête et les octets mesurés par une requête. Les bytesScanned valeurs bytesMetered et sont cumulatives et continuellement mises à jour lors de la pagination des résultats des requêtes. Vous pouvez utiliser ces informations pour comprendre les octets scannés par une requête individuelle et également les utiliser pour prendre certaines décisions. Par exemple, en supposant que le prix des requêtes soit de 0,01 USD par Go numérisé, vous souhaiterez peut-être annuler les requêtes supérieures à 25 USD par requête, ou X Go. L'extrait de code ci-dessous montre comment cela peut être fait.

Note

Ces extraits de code sont basés sur des exemples complets d'applications sur. GitHub Pour plus d'informations sur la façon de démarrer avec les exemples d'applications, consultezExemple d'application.

Java
private static final long ONE_GB_IN_BYTES = 1073741824L; private static final double QUERY_COST_PER_GB_IN_DOLLARS = 0.01; // Assuming the price of query is $0.01 per GB public void cancelQueryBasedOnQueryStatus() { System.out.println("Starting query: " + SELECT_ALL_QUERY); QueryRequest queryRequest = new QueryRequest(); queryRequest.setQueryString(SELECT_ALL_QUERY); QueryResult queryResult = queryClient.query(queryRequest); while (true) { final QueryStatus currentStatusOfQuery = queryResult.getQueryStatus(); System.out.println("Query progress so far: " + currentStatusOfQuery.getProgressPercentage() + "%"); double bytesMeteredSoFar = ((double) currentStatusOfQuery.getCumulativeBytesMetered() / ONE_GB_IN_BYTES); System.out.println("Bytes metered so far: " + bytesMeteredSoFar + " GB"); // Cancel query if its costing more than 1 cent if (bytesMeteredSoFar * QUERY_COST_PER_GB_IN_DOLLARS > 0.01) { cancelQuery(queryResult); break; } if (queryResult.getNextToken() == null) { break; } queryRequest.setNextToken(queryResult.getNextToken()); queryResult = queryClient.query(queryRequest); } }
Java v2
private static final long ONE_GB_IN_BYTES = 1073741824L; private static final double QUERY_COST_PER_GB_IN_DOLLARS = 0.01; // Assuming the price of query is $0.01 per GB public void cancelQueryBasedOnQueryStatus() { System.out.println("Starting query: " + SELECT_ALL_QUERY); QueryRequest queryRequest = QueryRequest.builder().queryString(SELECT_ALL_QUERY).build(); final QueryIterable queryResponseIterator = timestreamQueryClient.queryPaginator(queryRequest); for(QueryResponse queryResponse : queryResponseIterator) { final QueryStatus currentStatusOfQuery = queryResponse.queryStatus(); System.out.println("Query progress so far: " + currentStatusOfQuery.progressPercentage() + "%"); double bytesMeteredSoFar = ((double) currentStatusOfQuery.cumulativeBytesMetered() / ONE_GB_IN_BYTES); System.out.println("Bytes metered so far: " + bytesMeteredSoFar + "GB"); // Cancel query if its costing more than 1 cent if (bytesMeteredSoFar * QUERY_COST_PER_GB_IN_DOLLARS > 0.01) { cancelQuery(queryResponse); break; } } }
Go
const OneGbInBytes = 1073741824 // Assuming the price of query is $0.01 per GB const QueryCostPerGbInDollars = 0.01 func cancelQueryBasedOnQueryStatus(queryPtr *string, querySvc *timestreamquery.TimestreamQuery, f *os.File) { queryInput := &timestreamquery.QueryInput{ QueryString: aws.String(*queryPtr), } fmt.Println("QueryInput:") fmt.Println(queryInput) // execute the query err := querySvc.QueryPages(queryInput, func(page *timestreamquery.QueryOutput, lastPage bool) bool { // process query response queryStatus := page.QueryStatus fmt.Println("Current query status:", queryStatus) bytes_metered := float64(*queryStatus.CumulativeBytesMetered) / float64(ONE_GB_IN_BYTES) if bytes_metered * QUERY_COST_PER_GB_IN_DOLLARS > 0.01 { cancelQuery(page, querySvc) return true } // query response metadata // includes column names and types metadata := page.ColumnInfo // fmt.Println("Metadata:") fmt.Println(metadata) header := "" for i := 0; i < len(metadata); i++ { header += *metadata[i].Name if i != len(metadata)-1 { header += ", " } } write(f, header) // query response data fmt.Println("Data:") // process rows rows := page.Rows for i := 0; i < len(rows); i++ { data := rows[i].Data value := processRowType(data, metadata) fmt.Println(value) write(f, value) } fmt.Println("Number of rows:", len(page.Rows)) return true }) if err != nil { fmt.Println("Error:") fmt.Println(err) } }
Python
ONE_GB_IN_BYTES = 1073741824 # Assuming the price of query is $0.01 per GB QUERY_COST_PER_GB_IN_DOLLARS = 0.01 def cancel_query_based_on_query_status(self): try: print("Starting query: " + self.SELECT_ALL) page_iterator = self.paginator.paginate(QueryString=self.SELECT_ALL) for page in page_iterator: query_status = page["QueryStatus"] progress_percentage = query_status["ProgressPercentage"] print("Query progress so far: " + str(progress_percentage) + "%") bytes_metered = query_status["CumulativeBytesMetered"] / self.ONE_GB_IN_BYTES print("Bytes Metered so far: " + str(bytes_metered) + " GB") if bytes_metered * self.QUERY_COST_PER_GB_IN_DOLLARS > 0.01: self.cancel_query_for(page) break except Exception as err: print("Exception while running query:", err) traceback.print_exc(file=sys.stderr)
Node.js

L'extrait suivant utilise le style AWS SDK for JavaScript V2. Il est basé sur l'exemple d'application disponible sur Node.js, un exemple d'Amazon Timestream LiveAnalytics pour une application sur. GitHub

function parseQueryResult(response) { const queryStatus = response.QueryStatus; console.log("Current query status: " + JSON.stringify(queryStatus)); const columnInfo = response.ColumnInfo; const rows = response.Rows; console.log("Metadata: " + JSON.stringify(columnInfo)); console.log("Data: "); rows.forEach(function (row) { console.log(parseRow(columnInfo, row)); }); } function parseRow(columnInfo, row) { const data = row.Data; const rowOutput = []; var i; for ( i = 0; i < data.length; i++ ) { info = columnInfo[i]; datum = data[i]; rowOutput.push(parseDatum(info, datum)); } return `{${rowOutput.join(", ")}}` } function parseDatum(info, datum) { if (datum.NullValue != null && datum.NullValue === true) { return `${info.Name}=NULL`; } const columnType = info.Type; // If the column is of TimeSeries Type if (columnType.TimeSeriesMeasureValueColumnInfo != null) { return parseTimeSeries(info, datum); } // If the column is of Array Type else if (columnType.ArrayColumnInfo != null) { const arrayValues = datum.ArrayValue; return `${info.Name}=${parseArray(info.Type.ArrayColumnInfo, arrayValues)}`; } // If the column is of Row Type else if (columnType.RowColumnInfo != null) { const rowColumnInfo = info.Type.RowColumnInfo; const rowValues = datum.RowValue; return parseRow(rowColumnInfo, rowValues); } // If the column is of Scalar Type else { return parseScalarType(info, datum); } } function parseTimeSeries(info, datum) { const timeSeriesOutput = []; datum.TimeSeriesValue.forEach(function (dataPoint) { timeSeriesOutput.push(`{time=${dataPoint.Time}, value=${parseDatum(info.Type.TimeSeriesMeasureValueColumnInfo, dataPoint.Value)}}`) }); return `[${timeSeriesOutput.join(", ")}]` } function parseScalarType(info, datum) { return parseColumnName(info) + datum.ScalarValue; } function parseColumnName(info) { return info.Name == null ? "" : `${info.Name}=`; } function parseArray(arrayColumnInfo, arrayValues) { const arrayOutput = []; arrayValues.forEach(function (datum) { arrayOutput.push(parseDatum(arrayColumnInfo, datum)); }); return `[${arrayOutput.join(", ")}]` }
.NET
private static readonly long ONE_GB_IN_BYTES = 1073741824L; private static readonly double QUERY_COST_PER_GB_IN_DOLLARS = 0.01; // Assuming the price of query is $0.01 per GB private async Task CancelQueryBasedOnQueryStatus(string queryString) { try { QueryRequest queryRequest = new QueryRequest(); queryRequest.QueryString = queryString; QueryResponse queryResponse = await queryClient.QueryAsync(queryRequest); while (true) { QueryStatus queryStatus = queryResponse.QueryStatus; double bytesMeteredSoFar = ((double) queryStatus.CumulativeBytesMetered / ONE_GB_IN_BYTES); // Cancel query if its costing more than 1 cent if (bytesMeteredSoFar * QUERY_COST_PER_GB_IN_DOLLARS > 0.01) { await CancelQuery(queryResponse); break; } ParseQueryResult(queryResponse); if (queryResponse.NextToken == null) { break; } queryRequest.NextToken = queryResponse.NextToken; queryResponse = await queryClient.QueryAsync(queryRequest); } } catch(Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries Console.WriteLine(e.ToString()); } }

Pour plus de détails sur la procédure d'annulation d'une requête, consultezAnnuler la requête.