Esegui interrogazione - Amazon Timestream

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Esegui interrogazione

Impaginazione dei risultati

Quando si esegue una query, Timestream restituisce il set di risultati in modo impaginato per ottimizzare la reattività delle applicazioni. Il frammento di codice seguente mostra come è possibile impaginare il set di risultati. È necessario scorrere tutte le pagine del set di risultati finché non viene visualizzato un valore nullo. I token di paginazione scadono 3 ore dopo essere stati emessi da Timestream per. LiveAnalytics

Nota

Questi frammenti di codice si basano su applicazioni di esempio complete su. GitHub Per ulteriori informazioni su come iniziare a utilizzare le applicazioni di esempio, consulta. Applicazione di esempio

Java
private void runQuery(String queryString) { try { QueryRequest queryRequest = new QueryRequest(); queryRequest.setQueryString(queryString); QueryResult queryResult = queryClient.query(queryRequest); while (true) { parseQueryResult(queryResult); if (queryResult.getNextToken() == null) { break; } queryRequest.setNextToken(queryResult.getNextToken()); queryResult = queryClient.query(queryRequest); } } catch (Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries e.printStackTrace(); } }
Java v2
private void runQuery(String queryString) { try { QueryRequest queryRequest = QueryRequest.builder().queryString(queryString).build(); final QueryIterable queryResponseIterator = timestreamQueryClient.queryPaginator(queryRequest); for(QueryResponse queryResponse : queryResponseIterator) { parseQueryResult(queryResponse); } } catch (Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries e.printStackTrace(); } }
Go
func runQuery(queryPtr *string, querySvc *timestreamquery.TimestreamQuery, f *os.File) { queryInput := &timestreamquery.QueryInput{ QueryString: aws.String(*queryPtr), } fmt.Println("QueryInput:") fmt.Println(queryInput) // execute the query err := querySvc.QueryPages(queryInput, func(page *timestreamquery.QueryOutput, lastPage bool) bool { // process query response queryStatus := page.QueryStatus fmt.Println("Current query status:", queryStatus) // query response metadata // includes column names and types metadata := page.ColumnInfo // fmt.Println("Metadata:") fmt.Println(metadata) header := "" for i := 0; i < len(metadata); i++ { header += *metadata[i].Name if i != len(metadata)-1 { header += ", " } } write(f, header) // query response data fmt.Println("Data:") // process rows rows := page.Rows for i := 0; i < len(rows); i++ { data := rows[i].Data value := processRowType(data, metadata) fmt.Println(value) write(f, value) } fmt.Println("Number of rows:", len(page.Rows)) return true }) if err != nil { fmt.Println("Error:") fmt.Println(err) } }
Python
def run_query(self, query_string): try: page_iterator = self.paginator.paginate(QueryString=query_string) for page in page_iterator: self._parse_query_result(page) except Exception as err: print("Exception while running query:", err)
Node.js

Il frammento seguente utilizza lo stile AWS SDK for JavaScript V2. Si basa sull'applicazione di esempio disponibile in Node.js, esempio Amazon Timestream LiveAnalytics per l'applicazione su. GitHub

async function getAllRows(query, nextToken) { const params = { QueryString: query }; if (nextToken) { params.NextToken = nextToken; } await queryClient.query(params).promise() .then( (response) => { parseQueryResult(response); if (response.NextToken) { getAllRows(query, response.NextToken); } }, (err) => { console.error("Error while querying:", err); }); }
.NET
private async Task RunQueryAsync(string queryString) { try { QueryRequest queryRequest = new QueryRequest(); queryRequest.QueryString = queryString; QueryResponse queryResponse = await queryClient.QueryAsync(queryRequest); while (true) { ParseQueryResult(queryResponse); if (queryResponse.NextToken == null) { break; } queryRequest.NextToken = queryResponse.NextToken; queryResponse = await queryClient.QueryAsync(queryRequest); } } catch(Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries Console.WriteLine(e.ToString()); } }

Analisi dei set di risultati

È possibile utilizzare i seguenti frammenti di codice per estrarre dati dal set di risultati. I risultati delle query sono accessibili per un massimo di 24 ore dopo il completamento di una query.

Nota

Questi frammenti di codice si basano su applicazioni di esempio complete su. GitHub Per ulteriori informazioni su come iniziare a utilizzare le applicazioni di esempio, consulta. Applicazione di esempio

Java
private static final DateTimeFormatter TIMESTAMP_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSSSSSSSS"); private static final DateTimeFormatter DATE_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd"); private static final DateTimeFormatter TIME_FORMATTER = DateTimeFormatter.ofPattern("HH:mm:ss.SSSSSSSSS"); private static final long ONE_GB_IN_BYTES = 1073741824L; private void parseQueryResult(QueryResult response) { final QueryStatus currentStatusOfQuery = queryResult.getQueryStatus(); System.out.println("Query progress so far: " + currentStatusOfQuery.getProgressPercentage() + "%"); double bytesScannedSoFar = ((double) currentStatusOfQuery.getCumulativeBytesScanned() / ONE_GB_IN_BYTES); System.out.println("Bytes scanned so far: " + bytesScannedSoFar + " GB"); double bytesMeteredSoFar = ((double) currentStatusOfQuery.getCumulativeBytesMetered() / ONE_GB_IN_BYTES); System.out.println("Bytes metered so far: " + bytesMeteredSoFar + " GB"); List<ColumnInfo> columnInfo = response.getColumnInfo(); List<Row> rows = response.getRows(); System.out.println("Metadata: " + columnInfo); System.out.println("Data: "); // iterate every row for (Row row : rows) { System.out.println(parseRow(columnInfo, row)); } } private String parseRow(List<ColumnInfo> columnInfo, Row row) { List<Datum> data = row.getData(); List<String> rowOutput = new ArrayList<>(); // iterate every column per row for (int j = 0; j < data.size(); j++) { ColumnInfo info = columnInfo.get(j); Datum datum = data.get(j); rowOutput.add(parseDatum(info, datum)); } return String.format("{%s}", rowOutput.stream().map(Object::toString).collect(Collectors.joining(","))); } private String parseDatum(ColumnInfo info, Datum datum) { if (datum.isNullValue() != null && datum.isNullValue()) { return info.getName() + "=" + "NULL"; } Type columnType = info.getType(); // If the column is of TimeSeries Type if (columnType.getTimeSeriesMeasureValueColumnInfo() != null) { return parseTimeSeries(info, datum); } // If the column is of Array Type else if (columnType.getArrayColumnInfo() != null) { List<Datum> arrayValues = datum.getArrayValue(); return info.getName() + "=" + parseArray(info.getType().getArrayColumnInfo(), arrayValues); } // If the column is of Row Type else if (columnType.getRowColumnInfo() != null) { List<ColumnInfo> rowColumnInfo = info.getType().getRowColumnInfo(); Row rowValues = datum.getRowValue(); return parseRow(rowColumnInfo, rowValues); } // If the column is of Scalar Type else { return parseScalarType(info, datum); } } private String parseTimeSeries(ColumnInfo info, Datum datum) { List<String> timeSeriesOutput = new ArrayList<>(); for (TimeSeriesDataPoint dataPoint : datum.getTimeSeriesValue()) { timeSeriesOutput.add("{time=" + dataPoint.getTime() + ", value=" + parseDatum(info.getType().getTimeSeriesMeasureValueColumnInfo(), dataPoint.getValue()) + "}"); } return String.format("[%s]", timeSeriesOutput.stream().map(Object::toString).collect(Collectors.joining(","))); } private String parseScalarType(ColumnInfo info, Datum datum) { switch (ScalarType.fromValue(info.getType().getScalarType())) { case VARCHAR: return parseColumnName(info) + datum.getScalarValue(); case BIGINT: Long longValue = Long.valueOf(datum.getScalarValue()); return parseColumnName(info) + longValue; case INTEGER: Integer intValue = Integer.valueOf(datum.getScalarValue()); return parseColumnName(info) + intValue; case BOOLEAN: Boolean booleanValue = Boolean.valueOf(datum.getScalarValue()); return parseColumnName(info) + booleanValue; case DOUBLE: Double doubleValue = Double.valueOf(datum.getScalarValue()); return parseColumnName(info) + doubleValue; case TIMESTAMP: return parseColumnName(info) + LocalDateTime.parse(datum.getScalarValue(), TIMESTAMP_FORMATTER); case DATE: return parseColumnName(info) + LocalDate.parse(datum.getScalarValue(), DATE_FORMATTER); case TIME: return parseColumnName(info) + LocalTime.parse(datum.getScalarValue(), TIME_FORMATTER); case INTERVAL_DAY_TO_SECOND: case INTERVAL_YEAR_TO_MONTH: return parseColumnName(info) + datum.getScalarValue(); case UNKNOWN: return parseColumnName(info) + datum.getScalarValue(); default: throw new IllegalArgumentException("Given type is not valid: " + info.getType().getScalarType()); } } private String parseColumnName(ColumnInfo info) { return info.getName() == null ? "" : info.getName() + "="; } private String parseArray(ColumnInfo arrayColumnInfo, List<Datum> arrayValues) { List<String> arrayOutput = new ArrayList<>(); for (Datum datum : arrayValues) { arrayOutput.add(parseDatum(arrayColumnInfo, datum)); } return String.format("[%s]", arrayOutput.stream().map(Object::toString).collect(Collectors.joining(","))); }
Java v2
private static final long ONE_GB_IN_BYTES = 1073741824L; private void parseQueryResult(QueryResponse response) { final QueryStatus currentStatusOfQuery = response.queryStatus(); System.out.println("Query progress so far: " + currentStatusOfQuery.progressPercentage() + "%"); double bytesScannedSoFar = ((double) currentStatusOfQuery.cumulativeBytesScanned() / ONE_GB_IN_BYTES); System.out.println("Bytes scanned so far: " + bytesScannedSoFar + " GB"); double bytesMeteredSoFar = ((double) currentStatusOfQuery.cumulativeBytesMetered() / ONE_GB_IN_BYTES); System.out.println("Bytes metered so far: " + bytesMeteredSoFar + " GB"); List<ColumnInfo> columnInfo = response.columnInfo(); List<Row> rows = response.rows(); System.out.println("Metadata: " + columnInfo); System.out.println("Data: "); // iterate every row for (Row row : rows) { System.out.println(parseRow(columnInfo, row)); } } private String parseRow(List<ColumnInfo> columnInfo, Row row) { List<Datum> data = row.data(); List<String> rowOutput = new ArrayList<>(); // iterate every column per row for (int j = 0; j < data.size(); j++) { ColumnInfo info = columnInfo.get(j); Datum datum = data.get(j); rowOutput.add(parseDatum(info, datum)); } return String.format("{%s}", rowOutput.stream().map(Object::toString).collect(Collectors.joining(","))); } private String parseDatum(ColumnInfo info, Datum datum) { if (datum.nullValue() != null && datum.nullValue()) { return info.name() + "=" + "NULL"; } Type columnType = info.type(); // If the column is of TimeSeries Type if (columnType.timeSeriesMeasureValueColumnInfo() != null) { return parseTimeSeries(info, datum); } // If the column is of Array Type else if (columnType.arrayColumnInfo() != null) { List<Datum> arrayValues = datum.arrayValue(); return info.name() + "=" + parseArray(info.type().arrayColumnInfo(), arrayValues); } // If the column is of Row Type else if (columnType.rowColumnInfo() != null && columnType.rowColumnInfo().size() > 0) { List<ColumnInfo> rowColumnInfo = info.type().rowColumnInfo(); Row rowValues = datum.rowValue(); return parseRow(rowColumnInfo, rowValues); } // If the column is of Scalar Type else { return parseScalarType(info, datum); } } private String parseTimeSeries(ColumnInfo info, Datum datum) { List<String> timeSeriesOutput = new ArrayList<>(); for (TimeSeriesDataPoint dataPoint : datum.timeSeriesValue()) { timeSeriesOutput.add("{time=" + dataPoint.time() + ", value=" + parseDatum(info.type().timeSeriesMeasureValueColumnInfo(), dataPoint.value()) + "}"); } return String.format("[%s]", timeSeriesOutput.stream().map(Object::toString).collect(Collectors.joining(","))); } private String parseScalarType(ColumnInfo info, Datum datum) { return parseColumnName(info) + datum.scalarValue(); } private String parseColumnName(ColumnInfo info) { return info.name() == null ? "" : info.name() + "="; } private String parseArray(ColumnInfo arrayColumnInfo, List<Datum> arrayValues) { List<String> arrayOutput = new ArrayList<>(); for (Datum datum : arrayValues) { arrayOutput.add(parseDatum(arrayColumnInfo, datum)); } return String.format("[%s]", arrayOutput.stream().map(Object::toString).collect(Collectors.joining(","))); }
Go
func processScalarType(data *timestreamquery.Datum) string { return *data.ScalarValue } func processTimeSeriesType(data []*timestreamquery.TimeSeriesDataPoint, columnInfo *timestreamquery.ColumnInfo) string { value := "" for k := 0; k < len(data); k++ { time := data[k].Time value += *time + ":" if columnInfo.Type.ScalarType != nil { value += processScalarType(data[k].Value) } else if columnInfo.Type.ArrayColumnInfo != nil { value += processArrayType(data[k].Value.ArrayValue, columnInfo.Type.ArrayColumnInfo) } else if columnInfo.Type.RowColumnInfo != nil { value += processRowType(data[k].Value.RowValue.Data, columnInfo.Type.RowColumnInfo) } else { fail("Bad data type") } if k != len(data)-1 { value += ", " } } return value } func processArrayType(datumList []*timestreamquery.Datum, columnInfo *timestreamquery.ColumnInfo) string { value := "" for k := 0; k < len(datumList); k++ { if columnInfo.Type.ScalarType != nil { value += processScalarType(datumList[k]) } else if columnInfo.Type.TimeSeriesMeasureValueColumnInfo != nil { value += processTimeSeriesType(datumList[k].TimeSeriesValue, columnInfo.Type.TimeSeriesMeasureValueColumnInfo) } else if columnInfo.Type.ArrayColumnInfo != nil { value += "[" value += processArrayType(datumList[k].ArrayValue, columnInfo.Type.ArrayColumnInfo) value += "]" } else if columnInfo.Type.RowColumnInfo != nil { value += "[" value += processRowType(datumList[k].RowValue.Data, columnInfo.Type.RowColumnInfo) value += "]" } else { fail("Bad column type") } if k != len(datumList)-1 { value += ", " } } return value } func processRowType(data []*timestreamquery.Datum, metadata []*timestreamquery.ColumnInfo) string { value := "" for j := 0; j < len(data); j++ { if metadata[j].Type.ScalarType != nil { // process simple data types value += processScalarType(data[j]) } else if metadata[j].Type.TimeSeriesMeasureValueColumnInfo != nil { // fmt.Println("Timeseries measure value column info") // fmt.Println(metadata[j].Type.TimeSeriesMeasureValueColumnInfo.Type) datapointList := data[j].TimeSeriesValue value += "[" value += processTimeSeriesType(datapointList, metadata[j].Type.TimeSeriesMeasureValueColumnInfo) value += "]" } else if metadata[j].Type.ArrayColumnInfo != nil { columnInfo := metadata[j].Type.ArrayColumnInfo // fmt.Println("Array column info") // fmt.Println(columnInfo) datumList := data[j].ArrayValue value += "[" value += processArrayType(datumList, columnInfo) value += "]" } else if metadata[j].Type.RowColumnInfo != nil { columnInfo := metadata[j].Type.RowColumnInfo datumList := data[j].RowValue.Data value += "[" value += processRowType(datumList, columnInfo) value += "]" } else { panic("Bad column type") } // comma seperated column values if j != len(data)-1 { value += ", " } } return value }
Python
def _parse_query_result(self, query_result): query_status = query_result["QueryStatus"] progress_percentage = query_status["ProgressPercentage"] print(f"Query progress so far: {progress_percentage}%") bytes_scanned = float(query_status["CumulativeBytesScanned"]) / ONE_GB_IN_BYTES print(f"Data scanned so far: {bytes_scanned} GB") bytes_metered = float(query_status["CumulativeBytesMetered"]) / ONE_GB_IN_BYTES print(f"Data metered so far: {bytes_metered} GB") column_info = query_result['ColumnInfo'] print("Metadata: %s" % column_info) print("Data: ") for row in query_result['Rows']: print(self._parse_row(column_info, row)) def _parse_row(self, column_info, row): data = row['Data'] row_output = [] for j in range(len(data)): info = column_info[j] datum = data[j] row_output.append(self._parse_datum(info, datum)) return "{%s}" % str(row_output) def _parse_datum(self, info, datum): if datum.get('NullValue', False): return "%s=NULL" % info['Name'], column_type = info['Type'] # If the column is of TimeSeries Type if 'TimeSeriesMeasureValueColumnInfo' in column_type: return self._parse_time_series(info, datum) # If the column is of Array Type elif 'ArrayColumnInfo' in column_type: array_values = datum['ArrayValue'] return "%s=%s" % (info['Name'], self._parse_array(info['Type']['ArrayColumnInfo'], array_values)) # If the column is of Row Type elif 'RowColumnInfo' in column_type: row_column_info = info['Type']['RowColumnInfo'] row_values = datum['RowValue'] return self._parse_row(row_column_info, row_values) # If the column is of Scalar Type else: return self._parse_column_name(info) + datum['ScalarValue'] def _parse_time_series(self, info, datum): time_series_output = [] for data_point in datum['TimeSeriesValue']: time_series_output.append("{time=%s, value=%s}" % (data_point['Time'], self._parse_datum(info['Type']['TimeSeriesMeasureValueColumnInfo'], data_point['Value']))) return "[%s]" % str(time_series_output) def _parse_array(self, array_column_info, array_values): array_output = [] for datum in array_values: array_output.append(self._parse_datum(array_column_info, datum)) return "[%s]" % str(array_output) @staticmethod def _parse_column_name(info): if 'Name' in info: return info['Name'] + "=" else: return ""
Node.js

Il frammento seguente utilizza lo stile AWS SDK for JavaScript V2. Si basa sull'applicazione di esempio disponibile in Node.js, esempio Amazon Timestream LiveAnalytics per l'applicazione su. GitHub

function parseQueryResult(response) { const queryStatus = response.QueryStatus; console.log("Current query status: " + JSON.stringify(queryStatus)); const columnInfo = response.ColumnInfo; const rows = response.Rows; console.log("Metadata: " + JSON.stringify(columnInfo)); console.log("Data: "); rows.forEach(function (row) { console.log(parseRow(columnInfo, row)); }); } function parseRow(columnInfo, row) { const data = row.Data; const rowOutput = []; var i; for ( i = 0; i < data.length; i++ ) { info = columnInfo[i]; datum = data[i]; rowOutput.push(parseDatum(info, datum)); } return `{${rowOutput.join(", ")}}` } function parseDatum(info, datum) { if (datum.NullValue != null && datum.NullValue === true) { return `${info.Name}=NULL`; } const columnType = info.Type; // If the column is of TimeSeries Type if (columnType.TimeSeriesMeasureValueColumnInfo != null) { return parseTimeSeries(info, datum); } // If the column is of Array Type else if (columnType.ArrayColumnInfo != null) { const arrayValues = datum.ArrayValue; return `${info.Name}=${parseArray(info.Type.ArrayColumnInfo, arrayValues)}`; } // If the column is of Row Type else if (columnType.RowColumnInfo != null) { const rowColumnInfo = info.Type.RowColumnInfo; const rowValues = datum.RowValue; return parseRow(rowColumnInfo, rowValues); } // If the column is of Scalar Type else { return parseScalarType(info, datum); } } function parseTimeSeries(info, datum) { const timeSeriesOutput = []; datum.TimeSeriesValue.forEach(function (dataPoint) { timeSeriesOutput.push(`{time=${dataPoint.Time}, value=${parseDatum(info.Type.TimeSeriesMeasureValueColumnInfo, dataPoint.Value)}}`) }); return `[${timeSeriesOutput.join(", ")}]` } function parseScalarType(info, datum) { return parseColumnName(info) + datum.ScalarValue; } function parseColumnName(info) { return info.Name == null ? "" : `${info.Name}=`; } function parseArray(arrayColumnInfo, arrayValues) { const arrayOutput = []; arrayValues.forEach(function (datum) { arrayOutput.push(parseDatum(arrayColumnInfo, datum)); }); return `[${arrayOutput.join(", ")}]` }
.NET
private void ParseQueryResult(QueryResponse response) { List<ColumnInfo> columnInfo = response.ColumnInfo; var options = new JsonSerializerOptions { IgnoreNullValues = true }; List<String> columnInfoStrings = columnInfo.ConvertAll(x => JsonSerializer.Serialize(x, options)); List<Row> rows = response.Rows; QueryStatus queryStatus = response.QueryStatus; Console.WriteLine("Current Query status:" + JsonSerializer.Serialize(queryStatus, options)); Console.WriteLine("Metadata:" + string.Join(",", columnInfoStrings)); Console.WriteLine("Data:"); foreach (Row row in rows) { Console.WriteLine(ParseRow(columnInfo, row)); } } private string ParseRow(List<ColumnInfo> columnInfo, Row row) { List<Datum> data = row.Data; List<string> rowOutput = new List<string>(); for (int j = 0; j < data.Count; j++) { ColumnInfo info = columnInfo[j]; Datum datum = data[j]; rowOutput.Add(ParseDatum(info, datum)); } return $"{{{string.Join(",", rowOutput)}}}"; } private string ParseDatum(ColumnInfo info, Datum datum) { if (datum.NullValue) { return $"{info.Name}=NULL"; } Amazon.TimestreamQuery.Model.Type columnType = info.Type; if (columnType.TimeSeriesMeasureValueColumnInfo != null) { return ParseTimeSeries(info, datum); } else if (columnType.ArrayColumnInfo != null) { List<Datum> arrayValues = datum.ArrayValue; return $"{info.Name}={ParseArray(info.Type.ArrayColumnInfo, arrayValues)}"; } else if (columnType.RowColumnInfo != null && columnType.RowColumnInfo.Count > 0) { List<ColumnInfo> rowColumnInfo = info.Type.RowColumnInfo; Row rowValue = datum.RowValue; return ParseRow(rowColumnInfo, rowValue); } else { return ParseScalarType(info, datum); } } private string ParseTimeSeries(ColumnInfo info, Datum datum) { var timeseriesString = datum.TimeSeriesValue .Select(value => $"{{time={value.Time}, value={ParseDatum(info.Type.TimeSeriesMeasureValueColumnInfo, value.Value)}}}") .Aggregate((current, next) => current + "," + next); return $"[{timeseriesString}]"; } private string ParseScalarType(ColumnInfo info, Datum datum) { return ParseColumnName(info) + datum.ScalarValue; } private string ParseColumnName(ColumnInfo info) { return info.Name == null ? "" : (info.Name + "="); } private string ParseArray(ColumnInfo arrayColumnInfo, List<Datum> arrayValues) { return $"[{arrayValues.Select(value => ParseDatum(arrayColumnInfo, value)).Aggregate((current, next) => current + "," + next)}]"; }

Accesso allo stato della query

È possibile accedere allo stato della query tramiteQueryResponse, che contiene informazioni sullo stato di avanzamento di una query, sui byte analizzati da una query e sui byte misurati da una query. I bytesScanned valori bytesMetered and sono cumulativi e vengono aggiornati continuamente durante la paginazione dei risultati delle query. È possibile utilizzare queste informazioni per comprendere i byte analizzati da una singola query e utilizzarle anche per prendere determinate decisioni. Ad esempio, supponendo che il prezzo della query sia di 0,01 USD per GB scansionato, potresti voler annullare le query che superano i 25 USD per query o GB. X Il frammento di codice riportato di seguito mostra come è possibile eseguire questa operazione.

Nota

Questi frammenti di codice si basano su applicazioni di esempio complete su. GitHub Per ulteriori informazioni su come iniziare a utilizzare le applicazioni di esempio, consulta. Applicazione di esempio

Java
private static final long ONE_GB_IN_BYTES = 1073741824L; private static final double QUERY_COST_PER_GB_IN_DOLLARS = 0.01; // Assuming the price of query is $0.01 per GB public void cancelQueryBasedOnQueryStatus() { System.out.println("Starting query: " + SELECT_ALL_QUERY); QueryRequest queryRequest = new QueryRequest(); queryRequest.setQueryString(SELECT_ALL_QUERY); QueryResult queryResult = queryClient.query(queryRequest); while (true) { final QueryStatus currentStatusOfQuery = queryResult.getQueryStatus(); System.out.println("Query progress so far: " + currentStatusOfQuery.getProgressPercentage() + "%"); double bytesMeteredSoFar = ((double) currentStatusOfQuery.getCumulativeBytesMetered() / ONE_GB_IN_BYTES); System.out.println("Bytes metered so far: " + bytesMeteredSoFar + " GB"); // Cancel query if its costing more than 1 cent if (bytesMeteredSoFar * QUERY_COST_PER_GB_IN_DOLLARS > 0.01) { cancelQuery(queryResult); break; } if (queryResult.getNextToken() == null) { break; } queryRequest.setNextToken(queryResult.getNextToken()); queryResult = queryClient.query(queryRequest); } }
Java v2
private static final long ONE_GB_IN_BYTES = 1073741824L; private static final double QUERY_COST_PER_GB_IN_DOLLARS = 0.01; // Assuming the price of query is $0.01 per GB public void cancelQueryBasedOnQueryStatus() { System.out.println("Starting query: " + SELECT_ALL_QUERY); QueryRequest queryRequest = QueryRequest.builder().queryString(SELECT_ALL_QUERY).build(); final QueryIterable queryResponseIterator = timestreamQueryClient.queryPaginator(queryRequest); for(QueryResponse queryResponse : queryResponseIterator) { final QueryStatus currentStatusOfQuery = queryResponse.queryStatus(); System.out.println("Query progress so far: " + currentStatusOfQuery.progressPercentage() + "%"); double bytesMeteredSoFar = ((double) currentStatusOfQuery.cumulativeBytesMetered() / ONE_GB_IN_BYTES); System.out.println("Bytes metered so far: " + bytesMeteredSoFar + "GB"); // Cancel query if its costing more than 1 cent if (bytesMeteredSoFar * QUERY_COST_PER_GB_IN_DOLLARS > 0.01) { cancelQuery(queryResponse); break; } } }
Go
const OneGbInBytes = 1073741824 // Assuming the price of query is $0.01 per GB const QueryCostPerGbInDollars = 0.01 func cancelQueryBasedOnQueryStatus(queryPtr *string, querySvc *timestreamquery.TimestreamQuery, f *os.File) { queryInput := &timestreamquery.QueryInput{ QueryString: aws.String(*queryPtr), } fmt.Println("QueryInput:") fmt.Println(queryInput) // execute the query err := querySvc.QueryPages(queryInput, func(page *timestreamquery.QueryOutput, lastPage bool) bool { // process query response queryStatus := page.QueryStatus fmt.Println("Current query status:", queryStatus) bytes_metered := float64(*queryStatus.CumulativeBytesMetered) / float64(ONE_GB_IN_BYTES) if bytes_metered * QUERY_COST_PER_GB_IN_DOLLARS > 0.01 { cancelQuery(page, querySvc) return true } // query response metadata // includes column names and types metadata := page.ColumnInfo // fmt.Println("Metadata:") fmt.Println(metadata) header := "" for i := 0; i < len(metadata); i++ { header += *metadata[i].Name if i != len(metadata)-1 { header += ", " } } write(f, header) // query response data fmt.Println("Data:") // process rows rows := page.Rows for i := 0; i < len(rows); i++ { data := rows[i].Data value := processRowType(data, metadata) fmt.Println(value) write(f, value) } fmt.Println("Number of rows:", len(page.Rows)) return true }) if err != nil { fmt.Println("Error:") fmt.Println(err) } }
Python
ONE_GB_IN_BYTES = 1073741824 # Assuming the price of query is $0.01 per GB QUERY_COST_PER_GB_IN_DOLLARS = 0.01 def cancel_query_based_on_query_status(self): try: print("Starting query: " + self.SELECT_ALL) page_iterator = self.paginator.paginate(QueryString=self.SELECT_ALL) for page in page_iterator: query_status = page["QueryStatus"] progress_percentage = query_status["ProgressPercentage"] print("Query progress so far: " + str(progress_percentage) + "%") bytes_metered = query_status["CumulativeBytesMetered"] / self.ONE_GB_IN_BYTES print("Bytes Metered so far: " + str(bytes_metered) + " GB") if bytes_metered * self.QUERY_COST_PER_GB_IN_DOLLARS > 0.01: self.cancel_query_for(page) break except Exception as err: print("Exception while running query:", err) traceback.print_exc(file=sys.stderr)
Node.js

Il frammento seguente utilizza lo stile AWS SDK for JavaScript V2. Si basa sull'applicazione di esempio disponibile in Node.js, esempio Amazon Timestream LiveAnalytics per l'applicazione su. GitHub

function parseQueryResult(response) { const queryStatus = response.QueryStatus; console.log("Current query status: " + JSON.stringify(queryStatus)); const columnInfo = response.ColumnInfo; const rows = response.Rows; console.log("Metadata: " + JSON.stringify(columnInfo)); console.log("Data: "); rows.forEach(function (row) { console.log(parseRow(columnInfo, row)); }); } function parseRow(columnInfo, row) { const data = row.Data; const rowOutput = []; var i; for ( i = 0; i < data.length; i++ ) { info = columnInfo[i]; datum = data[i]; rowOutput.push(parseDatum(info, datum)); } return `{${rowOutput.join(", ")}}` } function parseDatum(info, datum) { if (datum.NullValue != null && datum.NullValue === true) { return `${info.Name}=NULL`; } const columnType = info.Type; // If the column is of TimeSeries Type if (columnType.TimeSeriesMeasureValueColumnInfo != null) { return parseTimeSeries(info, datum); } // If the column is of Array Type else if (columnType.ArrayColumnInfo != null) { const arrayValues = datum.ArrayValue; return `${info.Name}=${parseArray(info.Type.ArrayColumnInfo, arrayValues)}`; } // If the column is of Row Type else if (columnType.RowColumnInfo != null) { const rowColumnInfo = info.Type.RowColumnInfo; const rowValues = datum.RowValue; return parseRow(rowColumnInfo, rowValues); } // If the column is of Scalar Type else { return parseScalarType(info, datum); } } function parseTimeSeries(info, datum) { const timeSeriesOutput = []; datum.TimeSeriesValue.forEach(function (dataPoint) { timeSeriesOutput.push(`{time=${dataPoint.Time}, value=${parseDatum(info.Type.TimeSeriesMeasureValueColumnInfo, dataPoint.Value)}}`) }); return `[${timeSeriesOutput.join(", ")}]` } function parseScalarType(info, datum) { return parseColumnName(info) + datum.ScalarValue; } function parseColumnName(info) { return info.Name == null ? "" : `${info.Name}=`; } function parseArray(arrayColumnInfo, arrayValues) { const arrayOutput = []; arrayValues.forEach(function (datum) { arrayOutput.push(parseDatum(arrayColumnInfo, datum)); }); return `[${arrayOutput.join(", ")}]` }
.NET
private static readonly long ONE_GB_IN_BYTES = 1073741824L; private static readonly double QUERY_COST_PER_GB_IN_DOLLARS = 0.01; // Assuming the price of query is $0.01 per GB private async Task CancelQueryBasedOnQueryStatus(string queryString) { try { QueryRequest queryRequest = new QueryRequest(); queryRequest.QueryString = queryString; QueryResponse queryResponse = await queryClient.QueryAsync(queryRequest); while (true) { QueryStatus queryStatus = queryResponse.QueryStatus; double bytesMeteredSoFar = ((double) queryStatus.CumulativeBytesMetered / ONE_GB_IN_BYTES); // Cancel query if its costing more than 1 cent if (bytesMeteredSoFar * QUERY_COST_PER_GB_IN_DOLLARS > 0.01) { await CancelQuery(queryResponse); break; } ParseQueryResult(queryResponse); if (queryResponse.NextToken == null) { break; } queryRequest.NextToken = queryResponse.NextToken; queryResponse = await queryClient.QueryAsync(queryRequest); } } catch(Exception e) { // Some queries might fail with 500 if the result of a sequence function has more than 10000 entries Console.WriteLine(e.ToString()); } }

Per ulteriori dettagli su come annullare una query, consulta. Annulla interrogazione