Há mais exemplos de AWS SDK disponíveis no repositório AWS Doc SDK Examples
As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Exemplos de Serviço gerenciado para Apache Flink usando o SDK para Python (Boto3)
Os exemplos de código a seguir mostram como realizar ações e implementar cenários comuns usando o serviço AWS SDK for Python (Boto3) gerenciado para Apache Flink.
Ações são trechos de código de programas maiores e devem ser executadas em contexto. Embora as ações mostrem como chamar perfis de serviço individuais, você pode ver as ações no contexto em seus cenários relacionados.
Cada exemplo inclui um link para o código-fonte completo, em que você pode encontrar instruções sobre como configurar e executar o código.
Tópicos
Ações
O código de exemplo a seguir mostra como usar AddApplicationInput
.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def add_input(self, input_prefix, stream_arn, input_schema): """ Adds an input stream to the application. The input stream data is mapped to an in-application stream that can be processed by your code running in Kinesis Data Analytics. :param input_prefix: The prefix prepended to in-application input stream names. :param stream_arn: The ARN of the input stream. :param input_schema: A schema that maps the data in the input stream to the runtime environment. This can be automatically generated by using `discover_input_schema` or you can create it yourself. :return: Metadata about the newly added input. """ try: response = self.analytics_client.add_application_input( ApplicationName=self.name, CurrentApplicationVersionId=self.version_id, Input={ "NamePrefix": input_prefix, "KinesisStreamsInput": {"ResourceARN": stream_arn}, "InputSchema": input_schema, }, ) self.version_id = response["ApplicationVersionId"] logger.info("Add input stream %s to application %s.", stream_arn, self.name) except ClientError: logger.exception( "Couldn't add input stream %s to application %s.", stream_arn, self.name ) raise else: return response
-
Para obter detalhes da API, consulte a AddApplicationInputReferência da API AWS SDK for Python (Boto3).
-
O código de exemplo a seguir mostra como usar AddApplicationOutput
.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def add_output(self, in_app_stream_name, output_arn): """ Adds an output stream to the application. Kinesis Data Analytics maps data from the specified in-application stream to the output stream. :param in_app_stream_name: The name of the in-application stream to map to the output stream. :param output_arn: The ARN of the output stream. :return: A list of metadata about the output resources currently assigned to the application. """ try: response = self.analytics_client.add_application_output( ApplicationName=self.name, CurrentApplicationVersionId=self.version_id, Output={ "Name": in_app_stream_name, "KinesisStreamsOutput": {"ResourceARN": output_arn}, "DestinationSchema": {"RecordFormatType": "JSON"}, }, ) outputs = response["OutputDescriptions"] self.version_id = response["ApplicationVersionId"] logging.info( "Added output %s to %s, which now has %s outputs.", output_arn, self.name, len(outputs), ) except ClientError: logger.exception("Couldn't add output %s to %s.", output_arn, self.name) raise else: return outputs
-
Para obter detalhes da API, consulte a AddApplicationOutputReferência da API AWS SDK for Python (Boto3).
-
O código de exemplo a seguir mostra como usar CreateApplication
.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def create(self, app_name, role_arn, env="SQL-1_0"): """ Creates a Kinesis Data Analytics application. :param app_name: The name of the application. :param role_arn: The ARN of a role that can be assumed by Kinesis Data Analytics and grants needed permissions. :param env: The runtime environment of the application, such as SQL. Code uploaded to the application runs in this environment. :return: Metadata about the newly created application. """ try: response = self.analytics_client.create_application( ApplicationName=app_name, RuntimeEnvironment=env, ServiceExecutionRole=role_arn, ) details = response["ApplicationDetail"] self._update_details(details) logger.info("Application %s created.", app_name) except ClientError: logger.exception("Couldn't create application %s.", app_name) raise else: return details
-
Para obter detalhes da API, consulte a CreateApplicationReferência da API AWS SDK for Python (Boto3).
-
O código de exemplo a seguir mostra como usar DeleteApplication
.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def delete(self): """ Deletes an application. """ try: self.analytics_client.delete_application( ApplicationName=self.name, CreateTimestamp=self.create_timestamp ) logger.info("Deleted application %s.", self.name) except ClientError: logger.exception("Couldn't delete application %s.", self.name) raise
-
Para obter detalhes da API, consulte a DeleteApplicationReferência da API AWS SDK for Python (Boto3).
-
O código de exemplo a seguir mostra como usar DescribeApplication
.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def describe(self, name): """ Gets metadata about an application. :param name: The name of the application to look up. :return: Metadata about the application. """ try: response = self.analytics_client.describe_application(ApplicationName=name) details = response["ApplicationDetail"] self._update_details(details) logger.info("Got metadata for application %s.", name) except ClientError: logger.exception("Couldn't get metadata for application %s.", name) raise else: return details
-
Para obter detalhes da API, consulte a DescribeApplicationReferência da API AWS SDK for Python (Boto3).
-
O código de exemplo a seguir mostra como usar DescribeApplicationSnapshot
.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def describe_snapshot(self, application_name, snapshot_name): """ Gets metadata about a previously saved application snapshot. :param application_name: The name of the application. :param snapshot_name: The name of the snapshot. :return: Metadata about the snapshot. """ try: response = self.analytics_client.describe_application_snapshot( ApplicationName=application_name, SnapshotName=snapshot_name ) snapshot = response["SnapshotDetails"] logger.info( "Got metadata for snapshot %s of application %s.", snapshot_name, application_name, ) except ClientError: logger.exception( "Couldn't get metadata for snapshot %s of application %s.", snapshot_name, application_name, ) raise else: return snapshot
-
Para obter detalhes da API, consulte a DescribeApplicationSnapshotReferência da API AWS SDK for Python (Boto3).
-
O código de exemplo a seguir mostra como usar DiscoverInputSchema
.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def discover_input_schema(self, stream_arn, role_arn): """ Discovers a schema that maps data in a stream to a format that is usable by an application's runtime environment. The stream must be active and have enough data moving through it for the service to sample. The returned schema can be used when you add the stream as an input to the application or you can write your own schema. :param stream_arn: The ARN of the stream to map. :param role_arn: A role that lets Kinesis Data Analytics read from the stream. :return: The discovered schema of the data in the input stream. """ try: response = self.analytics_client.discover_input_schema( ResourceARN=stream_arn, ServiceExecutionRole=role_arn, InputStartingPositionConfiguration={"InputStartingPosition": "NOW"}, ) schema = response["InputSchema"] logger.info("Discovered input schema for stream %s.", stream_arn) except ClientError: logger.exception( "Couldn't discover input schema for stream %s.", stream_arn ) raise else: return schema
-
Para obter detalhes da API, consulte a DiscoverInputSchemaReferência da API AWS SDK for Python (Boto3).
-
O código de exemplo a seguir mostra como usar StartApplication
.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def start(self, input_id): """ Starts an application. After the application is running, it reads from the specified input stream and runs the application code on the incoming data. :param input_id: The ID of the input to read. """ try: self.analytics_client.start_application( ApplicationName=self.name, RunConfiguration={ "SqlRunConfigurations": [ { "InputId": input_id, "InputStartingPositionConfiguration": { "InputStartingPosition": "NOW" }, } ] }, ) logger.info("Started application %s.", self.name) except ClientError: logger.exception("Couldn't start application %s.", self.name) raise
-
Para obter detalhes da API, consulte a StartApplicationReferência da API AWS SDK for Python (Boto3).
-
O código de exemplo a seguir mostra como usar StopApplication
.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def stop(self): """ Stops an application. This stops the application from processing data but does not delete any resources. """ try: self.analytics_client.stop_application(ApplicationName=self.name) logger.info("Stopping application %s.", self.name) except ClientError: logger.exception("Couldn't stop application %s.", self.name) raise
-
Para obter detalhes da API, consulte a StopApplicationReferência da API AWS SDK for Python (Boto3).
-
O código de exemplo a seguir mostra como usar UpdateApplication
.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. Este exemplo atualiza o código que é executado em um aplicativo existente.
class KinesisAnalyticsApplicationV2: """Encapsulates Kinesis Data Analytics application functions.""" def __init__(self, analytics_client): """ :param analytics_client: A Boto3 Kinesis Data Analytics v2 client. """ self.analytics_client = analytics_client self.name = None self.arn = None self.version_id = None self.create_timestamp = None def update_code(self, code): """ Updates the code that runs in the application. The code must run in the runtime environment of the application, such as SQL. Application code typically reads data from in-application streams and transforms it in some way. :param code: The code to upload. This completely replaces any existing code in the application. :return: Metadata about the application. """ try: response = self.analytics_client.update_application( ApplicationName=self.name, CurrentApplicationVersionId=self.version_id, ApplicationConfigurationUpdate={ "ApplicationCodeConfigurationUpdate": { "CodeContentTypeUpdate": "PLAINTEXT", "CodeContentUpdate": {"TextContentUpdate": code}, } }, ) details = response["ApplicationDetail"] self.version_id = details["ApplicationVersionId"] logger.info("Update code for application %s.", self.name) except ClientError: logger.exception("Couldn't update code for application %s.", self.name) raise else: return details
-
Para obter detalhes da API, consulte a UpdateApplicationReferência da API AWS SDK for Python (Boto3).
-
Gerador de dados
O exemplo de código a seguir mostra como gerar um stream do Kinesis com um referenciador.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return {"REFERRER": "http://www.amazon.com"} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
O exemplo de código a seguir mostra como gerar um stream do Kinesis com anomalias de pressão arterial.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. from enum import Enum import json import random import boto3 STREAM_NAME = "ExampleInputStream" class PressureType(Enum): low = "LOW" normal = "NORMAL" high = "HIGH" def get_blood_pressure(pressure_type): pressure = {"BloodPressureLevel": pressure_type.value} if pressure_type == PressureType.low: pressure["Systolic"] = random.randint(50, 80) pressure["Diastolic"] = random.randint(30, 50) elif pressure_type == PressureType.normal: pressure["Systolic"] = random.randint(90, 120) pressure["Diastolic"] = random.randint(60, 80) elif pressure_type == PressureType.high: pressure["Systolic"] = random.randint(130, 200) pressure["Diastolic"] = random.randint(90, 150) else: raise TypeError return pressure def generate(stream_name, kinesis_client): while True: rnd = random.random() pressure_type = ( PressureType.low if rnd < 0.005 else PressureType.high if rnd > 0.995 else PressureType.normal ) blood_pressure = get_blood_pressure(pressure_type) print(blood_pressure) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(blood_pressure), PartitionKey="partitionkey", ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
O exemplo de código a seguir mostra como gerar um stream do Kinesis com dados em colunas.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return {"Col_A": "a", "Col_B": "b", "Col_C": "c", "Col_E_Unstructured": "x,y,z"} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
O exemplo de código a seguir mostra como gerar um stream do Kinesis com anomalias da frequência cardíaca.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. from enum import Enum import json import random import boto3 STREAM_NAME = "ExampleInputStream" class RateType(Enum): normal = "NORMAL" high = "HIGH" def get_heart_rate(rate_type): if rate_type == RateType.normal: rate = random.randint(60, 100) elif rate_type == RateType.high: rate = random.randint(150, 200) else: raise TypeError return {"heartRate": rate, "rateType": rate_type.value} def generate(stream_name, kinesis_client, output=True): while True: rnd = random.random() rate_type = RateType.high if rnd < 0.01 else RateType.normal heart_rate = get_heart_rate(rate_type) if output: print(heart_rate) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(heart_rate), PartitionKey="partitionkey", ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
O exemplo de código a seguir mostra como gerar um stream do Kinesis com hotspots.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. import json from pprint import pprint import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_hotspot(field, spot_size): hotspot = { "left": field["left"] + random.random() * (field["width"] - spot_size), "width": spot_size, "top": field["top"] + random.random() * (field["height"] - spot_size), "height": spot_size, } return hotspot def get_record(field, hotspot, hotspot_weight): rectangle = hotspot if random.random() < hotspot_weight else field point = { "x": rectangle["left"] + random.random() * rectangle["width"], "y": rectangle["top"] + random.random() * rectangle["height"], "is_hot": "Y" if rectangle is hotspot else "N", } return {"Data": json.dumps(point), "PartitionKey": "partition_key"} def generate( stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client ): """ Generates points used as input to a hotspot detection algorithm. With probability hotspot_weight (20%), a point is drawn from the hotspot; otherwise, it is drawn from the base field. The location of the hotspot changes for every 1000 points generated. """ points_generated = 0 hotspot = None while True: if points_generated % 1000 == 0: hotspot = get_hotspot(field, hotspot_size) records = [ get_record(field, hotspot, hotspot_weight) for _ in range(batch_size) ] points_generated += len(records) pprint(records) kinesis_client.put_records(StreamName=stream_name, Records=records) time.sleep(0.1) if __name__ == "__main__": generate( stream_name=STREAM_NAME, field={"left": 0, "width": 10, "top": 0, "height": 10}, hotspot_size=1, hotspot_weight=0.2, batch_size=10, kinesis_client=boto3.client("kinesis"), )
O exemplo de código a seguir mostra como gerar um stream do Kinesis com entradas de log.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "LOGENTRY": "203.0.113.24 - - [25/Mar/2018:15:25:37 -0700] " '"GET /index.php HTTP/1.1" 200 125 "-" ' '"Mozilla/5.0 [en] Gecko/20100101 Firefox/52.0"' } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
O exemplo de código a seguir mostra como gerar um stream do Kinesis com dados escalonados.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. import datetime import json import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): event_time = datetime.datetime.utcnow() - datetime.timedelta(seconds=10) return { "EVENT_TIME": event_time.isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), } def generate(stream_name, kinesis_client): while True: data = get_data() # Send six records, ten seconds apart, with the same event time and ticker for _ in range(6): print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey", ) time.sleep(10) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
O exemplo de código a seguir mostra como gerar um stream do Kinesis com dados de cotação da bolsa.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
O exemplo de código a seguir mostra como gerar um stream do Kinesis com dois tipos de dados.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. import json import random import boto3 STREAM_NAME = "OrdersAndTradesStream" PARTITION_KEY = "partition_key" def get_order(order_id, ticker): return { "RecordType": "Order", "Oid": order_id, "Oticker": ticker, "Oprice": random.randint(500, 10000), "Otype": "Sell", } def get_trade(order_id, trade_id, ticker): return { "RecordType": "Trade", "Tid": trade_id, "Toid": order_id, "Tticker": ticker, "Tprice": random.randint(0, 3000), } def generate(stream_name, kinesis_client): order_id = 1 while True: ticker = random.choice(["AAAA", "BBBB", "CCCC"]) order = get_order(order_id, ticker) print(order) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(order), PartitionKey=PARTITION_KEY ) for trade_id in range(1, random.randint(0, 6)): trade = get_trade(order_id, trade_id, ticker) print(trade) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(trade), PartitionKey=PARTITION_KEY, ) order_id += 1 if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
O exemplo de código a seguir mostra como gerar um stream do Kinesis com dados de log da web.
- SDK para Python (Boto3)
-
nota
Tem mais sobre GitHub. Encontre o exemplo completo e saiba como configurar e executar no Repositório de exemplos de código da AWS
. import json import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "log": "192.168.254.30 - John [24/May/2004:22:01:02 -0700] " '"GET /icons/apache_pb.gif HTTP/1.1" 304 0' } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))