Esempi di Kinesis che utilizzano AWS SDK for .NET - Esempi di codice dell'AWS SDK

Ci sono altri AWS SDK esempi disponibili nel repository AWS Doc SDK Examples GitHub .

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Esempi di Kinesis che utilizzano AWS SDK for .NET

I seguenti esempi di codice mostrano come eseguire azioni e implementare scenari comuni utilizzando AWS SDK for .NET con Kinesis.

Le operazioni sono estratti di codice da programmi più grandi e devono essere eseguite nel contesto. Mentre le azioni mostrano come richiamare le singole funzioni di servizio, puoi vedere le azioni nel loro contesto negli scenari correlati.

Ogni esempio include un collegamento al codice sorgente completo, in cui è possibile trovare istruzioni su come configurare ed eseguire il codice nel contesto.

Azioni

Il seguente esempio di codice mostra come utilizzareAddTagsToStream.

AWS SDK for .NET
Nota

C'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

using System; using System.Collections.Generic; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// This example shows how to apply key/value pairs to an Amazon Kinesis /// stream. /// </summary> public class TagStream { public static async Task Main() { IAmazonKinesis client = new AmazonKinesisClient(); string streamName = "AmazonKinesisStream"; var tags = new Dictionary<string, string> { { "Project", "Sample Kinesis Project" }, { "Application", "Sample Kinesis App" }, }; var success = await ApplyTagsToStreamAsync(client, streamName, tags); if (success) { Console.WriteLine($"Taggs successfully added to {streamName}."); } else { Console.WriteLine("Tags were not added to the stream."); } } /// <summary> /// Applies the set of tags to the named Kinesis stream. /// </summary> /// <param name="client">The initialized Kinesis client.</param> /// <param name="streamName">The name of the Kinesis stream to which /// the tags will be attached.</param> /// <param name="tags">A sictionary containing key/value pairs which /// will be used to create the Kinesis tags.</param> /// <returns>A Boolean value which represents the success or failure /// of AddTagsToStreamAsync.</returns> public static async Task<bool> ApplyTagsToStreamAsync( IAmazonKinesis client, string streamName, Dictionary<string, string> tags) { var request = new AddTagsToStreamRequest { StreamName = streamName, Tags = tags, }; var response = await client.AddTagsToStreamAsync(request); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } }

Il seguente esempio di codice mostra come utilizzareCreateStream.

AWS SDK for .NET
Nota

C'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

using System; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// This example shows how to create a new Amazon Kinesis stream. /// </summary> public class CreateStream { public static async Task Main() { IAmazonKinesis client = new AmazonKinesisClient(); string streamName = "AmazonKinesisStream"; int shardCount = 1; var success = await CreateNewStreamAsync(client, streamName, shardCount); if (success) { Console.WriteLine($"The stream, {streamName} successfully created."); } } /// <summary> /// Creates a new Kinesis stream. /// </summary> /// <param name="client">An initialized Kinesis client.</param> /// <param name="streamName">The name for the new stream.</param> /// <param name="shardCount">The number of shards the new stream will /// use. The throughput of the stream is a function of the number of /// shards; more shards are required for greater provisioned /// throughput.</param> /// <returns>A Boolean value indicating whether the stream was created.</returns> public static async Task<bool> CreateNewStreamAsync(IAmazonKinesis client, string streamName, int shardCount) { var request = new CreateStreamRequest { StreamName = streamName, ShardCount = shardCount, }; var response = await client.CreateStreamAsync(request); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } }

Il seguente esempio di codice mostra come utilizzareDeleteStream.

AWS SDK for .NET
Nota

C'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

using System; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// Shows how to delete an Amazon Kinesis stream. /// </summary> public class DeleteStream { public static async Task Main() { IAmazonKinesis client = new AmazonKinesisClient(); string streamName = "AmazonKinesisStream"; var success = await DeleteStreamAsync(client, streamName); if (success) { Console.WriteLine($"Stream, {streamName} successfully deleted."); } else { Console.WriteLine("Stream not deleted."); } } /// <summary> /// Deletes a Kinesis stream. /// </summary> /// <param name="client">An initialized Kinesis client object.</param> /// <param name="streamName">The name of the string to delete.</param> /// <returns>A Boolean value representing the success of the operation.</returns> public static async Task<bool> DeleteStreamAsync(IAmazonKinesis client, string streamName) { // If EnforceConsumerDeletion is true, any consumers // of this stream will also be deleted. If it is set // to false and this stream has any consumers, the // call will fail with a ResourceInUseException. var request = new DeleteStreamRequest { StreamName = streamName, EnforceConsumerDeletion = true, }; var response = await client.DeleteStreamAsync(request); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } }

Il seguente esempio di codice mostra come utilizzareDeregisterStreamConsumer.

AWS SDK for .NET
Nota

C'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

using System; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// Shows how to deregister a consumer from an Amazon Kinesis stream. /// </summary> public class DeregisterConsumer { public static async Task Main(string[] args) { IAmazonKinesis client = new AmazonKinesisClient(); string streamARN = "arn:aws:kinesis:us-west-2:000000000000:stream/AmazonKinesisStream"; string consumerName = "CONSUMER_NAME"; string consumerARN = "arn:aws:kinesis:us-west-2:000000000000:stream/AmazonKinesisStream/consumer/CONSUMER_NAME:000000000000"; var success = await DeregisterConsumerAsync(client, streamARN, consumerARN, consumerName); if (success) { Console.WriteLine($"{consumerName} successfully deregistered."); } else { Console.WriteLine($"{consumerName} was not successfully deregistered."); } } /// <summary> /// Deregisters a consumer from a Kinesis stream. /// </summary> /// <param name="client">An initialized Kinesis client object.</param> /// <param name="streamARN">The ARN of a Kinesis stream.</param> /// <param name="consumerARN">The ARN of the consumer.</param> /// <param name="consumerName">The name of the consumer.</param> /// <returns>A Boolean value representing the success of the operation.</returns> public static async Task<bool> DeregisterConsumerAsync( IAmazonKinesis client, string streamARN, string consumerARN, string consumerName) { var request = new DeregisterStreamConsumerRequest { StreamARN = streamARN, ConsumerARN = consumerARN, ConsumerName = consumerName, }; var response = await client.DeregisterStreamConsumerAsync(request); return response.HttpStatusCode == System.Net.HttpStatusCode.OK; } }

Il seguente esempio di codice mostra come utilizzareListStreamConsumers.

AWS SDK for .NET
Nota

C'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

using System; using System.Collections.Generic; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// List the consumers of an Amazon Kinesis stream. /// </summary> public class ListConsumers { public static async Task Main() { IAmazonKinesis client = new AmazonKinesisClient(); string streamARN = "arn:aws:kinesis:us-east-2:000000000000:stream/AmazonKinesisStream"; int maxResults = 10; var consumers = await ListConsumersAsync(client, streamARN, maxResults); if (consumers.Count > 0) { consumers .ForEach(c => Console.WriteLine($"Name: {c.ConsumerName} ARN: {c.ConsumerARN}")); } else { Console.WriteLine("No consumers found."); } } /// <summary> /// Retrieve a list of the consumers for a Kinesis stream. /// </summary> /// <param name="client">An initialized Kinesis client object.</param> /// <param name="streamARN">The ARN of the stream for which we want to /// retrieve a list of clients.</param> /// <param name="maxResults">The maximum number of results to return.</param> /// <returns>A list of Consumer objects.</returns> public static async Task<List<Consumer>> ListConsumersAsync(IAmazonKinesis client, string streamARN, int maxResults) { var request = new ListStreamConsumersRequest { StreamARN = streamARN, MaxResults = maxResults, }; var response = await client.ListStreamConsumersAsync(request); return response.Consumers; } }

Il seguente esempio di codice mostra come utilizzareListStreams.

AWS SDK for .NET
Nota

C'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

using System; using System.Collections.Generic; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// Retrieves and displays a list of existing Amazon Kinesis streams. /// </summary> public class ListStreams { public static async Task Main(string[] args) { IAmazonKinesis client = new AmazonKinesisClient(); var response = await client.ListStreamsAsync(new ListStreamsRequest()); List<string> streamNames = response.StreamNames; if (streamNames.Count > 0) { streamNames .ForEach(s => Console.WriteLine($"Stream name: {s}")); } else { Console.WriteLine("No streams were found."); } } }

Il seguente esempio di codice mostra come utilizzareListTagsForStream.

AWS SDK for .NET
Nota

C'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

using System; using System.Collections.Generic; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// Shows how to list the tags that have been attached to an Amazon Kinesis /// stream. /// </summary> public class ListTags { public static async Task Main() { IAmazonKinesis client = new AmazonKinesisClient(); string streamName = "AmazonKinesisStream"; await ListTagsAsync(client, streamName); } /// <summary> /// List the tags attached to a Kinesis stream. /// </summary> /// <param name="client">An initialized Kinesis client object.</param> /// <param name="streamName">The name of the Kinesis stream for which you /// wish to display tags.</param> public static async Task ListTagsAsync(IAmazonKinesis client, string streamName) { var request = new ListTagsForStreamRequest { StreamName = streamName, Limit = 10, }; var response = await client.ListTagsForStreamAsync(request); DisplayTags(response.Tags); while (response.HasMoreTags) { request.ExclusiveStartTagKey = response.Tags[response.Tags.Count - 1].Key; response = await client.ListTagsForStreamAsync(request); } } /// <summary> /// Displays the items in a list of Kinesis tags. /// </summary> /// <param name="tags">A list of the Tag objects to be displayed.</param> public static void DisplayTags(List<Tag> tags) { tags .ForEach(t => Console.WriteLine($"Key: {t.Key} Value: {t.Value}")); } }

Il seguente esempio di codice mostra come utilizzareRegisterStreamConsumer.

AWS SDK for .NET
Nota

C'è altro da fare GitHub. Trova l'esempio completo e scopri di più sulla configurazione e l'esecuzione nel Repository di esempi di codice AWS.

using System; using System.Threading.Tasks; using Amazon.Kinesis; using Amazon.Kinesis.Model; /// <summary> /// This example shows how to register a consumer to an Amazon Kinesis /// stream. /// </summary> public class RegisterConsumer { public static async Task Main() { IAmazonKinesis client = new AmazonKinesisClient(); string consumerName = "NEW_CONSUMER_NAME"; string streamARN = "arn:aws:kinesis:us-east-2:000000000000:stream/AmazonKinesisStream"; var consumer = await RegisterConsumerAsync(client, consumerName, streamARN); if (consumer is not null) { Console.WriteLine($"{consumer.ConsumerName}"); } } /// <summary> /// Registers the consumer to a Kinesis stream. /// </summary> /// <param name="client">The initialized Kinesis client object.</param> /// <param name="consumerName">A string representing the consumer.</param> /// <param name="streamARN">The ARN of the stream.</param> /// <returns>A Consumer object that contains information about the consumer.</returns> public static async Task<Consumer> RegisterConsumerAsync(IAmazonKinesis client, string consumerName, string streamARN) { var request = new RegisterStreamConsumerRequest { ConsumerName = consumerName, StreamARN = streamARN, }; var response = await client.RegisterStreamConsumerAsync(request); return response.Consumer; } }

Esempi serverless

Il seguente esempio di codice mostra come implementare una funzione Lambda che riceve un evento attivato dalla ricezione di record da un flusso Kinesis. La funzione recupera il payload Kinesis, lo decodifica da Base64 e registra il contenuto del record.

AWS SDK for .NET
Nota

C'è altro su. GitHub Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.

Consumo di un evento Kinesis con Lambda utilizzando. NET.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegrationSampleCode; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return; } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); throw; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } }

Il seguente esempio di codice mostra come implementare una risposta batch parziale per le funzioni Lambda che ricevono eventi da un flusso Kinesis. La funzione riporta gli errori degli elementi batch nella risposta, segnalando a Lambda di riprovare tali messaggi in un secondo momento.

AWS SDK for .NET
Nota

C'è di più su. GitHub Trova l'esempio completo e scopri come eseguire la configurazione e l'esecuzione nel repository di Esempi serverless.

Segnalazione degli errori degli elementi batch di Kinesis utilizzando Lambda. NET.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 using System.Text; using System.Text.Json.Serialization; using Amazon.Lambda.Core; using Amazon.Lambda.KinesisEvents; using AWS.Lambda.Powertools.Logging; // Assembly attribute to enable the Lambda function's JSON input to be converted into a .NET class. [assembly: LambdaSerializer(typeof(Amazon.Lambda.Serialization.SystemTextJson.DefaultLambdaJsonSerializer))] namespace KinesisIntegration; public class Function { // Powertools Logger requires an environment variables against your function // POWERTOOLS_SERVICE_NAME [Logging(LogEvent = true)] public async Task<StreamsEventResponse> FunctionHandler(KinesisEvent evnt, ILambdaContext context) { if (evnt.Records.Count == 0) { Logger.LogInformation("Empty Kinesis Event received"); return new StreamsEventResponse(); } foreach (var record in evnt.Records) { try { Logger.LogInformation($"Processed Event with EventId: {record.EventId}"); string data = await GetRecordDataAsync(record.Kinesis, context); Logger.LogInformation($"Data: {data}"); // TODO: Do interesting work based on the new data } catch (Exception ex) { Logger.LogError($"An error occurred {ex.Message}"); /* Since we are working with streams, we can return the failed item immediately. Lambda will immediately begin to retry processing from this failed item onwards. */ return new StreamsEventResponse { BatchItemFailures = new List<StreamsEventResponse.BatchItemFailure> { new StreamsEventResponse.BatchItemFailure { ItemIdentifier = record.Kinesis.SequenceNumber } } }; } } Logger.LogInformation($"Successfully processed {evnt.Records.Count} records."); return new StreamsEventResponse(); } private async Task<string> GetRecordDataAsync(KinesisEvent.Record record, ILambdaContext context) { byte[] bytes = record.Data.ToArray(); string data = Encoding.UTF8.GetString(bytes); await Task.CompletedTask; //Placeholder for actual async work return data; } } public class StreamsEventResponse { [JsonPropertyName("batchItemFailures")] public IList<BatchItemFailure> BatchItemFailures { get; set; } public class BatchItemFailure { [JsonPropertyName("itemIdentifier")] public string ItemIdentifier { get; set; } } }