Automating AWS Device Farm - AWS Device Farm

Automating AWS Device Farm

Programmatic access to Device Farm is a powerful way to automate the common tasks that you need to accomplish, such as scheduling a run or downloading the artifacts for a run, suite, or test. The AWS SDK and AWS CLI provide means to do so.

The AWS SDK provides access to every AWS service, including Device Farm, Amazon S3, and more. For more information, see

Example: Using the AWS SDK to start a Device Farm run and collect artifacts

The following example provides a beginning-to-end demonstration of how you can use the AWS SDK to work with Device Farm. This example does the following:

  • Uploads a test and application packages to Device Farm

  • Starts a test run and waits for its completion (or failure)

  • Downloads all artifacts produced by the test suites

This example depends on the third-party requests package to interact with HTTP.

import boto3 import os import requests import string import random import time import datetime import time import json # The following script runs a test through Device Farm # # Things you have to change: config = { # This is our app under test. "appFilePath":"app-debug.apk", "projectArn": "arn:aws:devicefarm:us-west-2:111122223333:project:1b99bcff-1111-2222-ab2f-8c3c733c55ed", # Since we care about the most popular devices, we'll use a curated pool. "testSpecArn":"arn:aws:devicefarm:us-west-2::upload:101e31e8-12ac-11e9-ab14-d663bd873e83", "poolArn":"arn:aws:devicefarm:us-west-2::devicepool:082d10e5-d7d7-48a5-ba5c-b33d66efa1f5", "namePrefix":"MyAppTest", # This is our test package. This tutorial won't go into how to make these. "testPackage":"tests.zip" } client = boto3.client('devicefarm') unique = config['namePrefix']+"-"+(datetime.date.today().isoformat())+(''.join(random.sample(string.ascii_letters,8))) print(f"The unique identifier for this run is going to be {unique} -- all uploads will be prefixed with this.") def upload_df_file(filename, type_, mime='application/octet-stream'): response = client.create_upload(projectArn=config['projectArn'], name = (unique)+"_"+os.path.basename(filename), type=type_, contentType=mime ) # Get the upload ARN, which we'll return later. upload_arn = response['upload']['arn'] # We're going to extract the URL of the upload and use Requests to upload it upload_url = response['upload']['url'] with open(filename, 'rb') as file_stream: print(f"Uploading {filename} to Device Farm as {response['upload']['name']}... ",end='') put_req = requests.put(upload_url, data=file_stream, headers={"content-type":mime}) print(' done') if not put_req.ok: raise Exception("Couldn't upload, requests said we're not ok. Requests says: "+put_req.reason) started = datetime.datetime.now() while True: print(f"Upload of {filename} in state {response['upload']['status']} after "+str(datetime.datetime.now() - started)) if response['upload']['status'] == 'FAILED': raise Exception("The upload failed processing. DeviceFarm says reason is: \n"+(response['upload']['message'] if 'message' in response['upload'] else response['upload']['metadata'])) if response['upload']['status'] == 'SUCCEEDED': break time.sleep(5) response = client.get_upload(arn=upload_arn) print("") return upload_arn our_upload_arn = upload_df_file(config['appFilePath'], "ANDROID_APP") our_test_package_arn = upload_df_file(config['testPackage'], 'APPIUM_PYTHON_TEST_PACKAGE') print(our_upload_arn, our_test_package_arn) # Now that we have those out of the way, we can start the test run... response = client.schedule_run( projectArn = config["projectArn"], appArn = our_upload_arn, devicePoolArn = config["poolArn"], name=unique, test = { "type":"APPIUM_PYTHON", "testSpecArn": config["testSpecArn"], "testPackageArn": our_test_package_arn } ) run_arn = response['run']['arn'] start_time = datetime.datetime.now() print(f"Run {unique} is scheduled as arn {run_arn} ") try: while True: response = client.get_run(arn=run_arn) state = response['run']['status'] if state == 'COMPLETED' or state == 'ERRORED': break else: print(f" Run {unique} in state {state}, total time "+str(datetime.datetime.now()-start_time)) time.sleep(10) except: # If something goes wrong in this process, we stop the run and exit. client.stop_run(arn=run_arn) exit(1) print(f"Tests finished in state {state} after "+str(datetime.datetime.now() - start_time)) # now, we pull all the logs. jobs_response = client.list_jobs(arn=run_arn) # Save the output somewhere. We're using the unique value, but you could use something else save_path = os.path.join(os.getcwd(), unique) os.mkdir(save_path) # Save the last run information for job in jobs_response['jobs'] : # Make a directory for our information job_name = job['name'] os.makedirs(os.path.join(save_path, job_name), exist_ok=True) # Get each suite within the job suites = client.list_suites(arn=job['arn'])['suites'] for suite in suites: for test in client.list_tests(arn=suite['arn'])['tests']: # Get the artifacts for artifact_type in ['FILE','SCREENSHOT','LOG']: artifacts = client.list_artifacts( type=artifact_type, arn = test['arn'] )['artifacts'] for artifact in artifacts: # We replace : because it has a special meaning in Windows & macos path_to = os.path.join(save_path, job_name, suite['name'], test['name'].replace(':','_') ) os.makedirs(path_to, exist_ok=True) filename = artifact['type']+"_"+artifact['name']+"."+artifact['extension'] artifact_save_path = os.path.join(path_to, filename) print("Downloading "+artifact_save_path) with open(artifact_save_path, 'wb') as fn, requests.get(artifact['url'],allow_redirects=True) as request: fn.write(request.content) #/for artifact in artifacts #/for artifact type in [] #/ for test in ()[] #/ for suite in suites #/ for job in _[] # done print("Finished")