Amazon Bedrock AgentCore is in preview release and is subject to change.
Step 4: Use OAuth2 Access Token to Invoke External Resource
Once the agent obtains a Google access token with the steps above, it can use the access token to access Google Drive. Here is a full example that lists the names and IDs of the first 10 files that the user has access to.
First, install the Google client library for Python:
pip install --upgrade google-api-python-client google-auth-httplib2 google-auth-oauthlib
Then, copy the following code:
import asyncio from bedrock_agentcore.identity.auth import requires_access_token, requires_api_key from google.auth.transport.requests import Request from google.oauth2.credentials import Credentials from google_auth_oauthlib.flow import InstalledAppFlow from googleapiclient.discovery import build from googleapiclient.errors import HttpError SCOPES= ["https://www.googleapis.com/auth/drive.metadata.readonly"] def main(access_token): """Shows basic usage of the Drive v3 API. Prints the names and ids of the first 10 files the user has access to. """ creds= Credentials(token= access_token, scopes= SCOPES)try: service= build("drive", "v3", credentials= creds)# Call the Drive v3 API results= (service.files().list(pageSize= 10, fields= "nextPageToken, files(id, name)").execute())items= results.get("files", [])if not items: print("No files found.")return print("Files:")for item in items: print(f"{item['name']}({item['id']})")except HttpError as error: # TODO(developer)- Handle errors from drive API. print(f"An error occurred: {error}")if __name__= = "__main__": # This annotation helps agent developer to obtain access tokens from external applications @requires_access_token(provider_name= "google-provider", scopes= ["https://www.googleapis.com/auth/drive.metadata.readonly"], # Google OAuth2 scopes auth_flow= "USER_FEDERATION", # 3LO flow on_auth_url= lambda x: print("Copy and paste this authorization url to your browser", x), # prints authorization URL to console force_authentication= True,)async def read_from_google_drive(*, access_token: str): print(access_token)#You can see the access_token # Make API calls... main(access_token)asyncio.run(read_from_google_drive(access_token= ""))