You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
938 lines
42 KiB
938 lines
42 KiB
#!/usr/bin/python
|
|
|
|
# Copyright (C) 2014 The Android Open Source Project
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import logging
|
|
import os.path
|
|
import select
|
|
import sys
|
|
import time
|
|
import collections
|
|
import socket
|
|
import gflags as flags # http://code.google.com/p/python-gflags/
|
|
import pkgutil
|
|
import threading
|
|
import Queue
|
|
import traceback
|
|
import math
|
|
import bisect
|
|
from bisect import bisect_left
|
|
|
|
"""
|
|
scipy, numpy and matplotlib are python packages that can be installed
|
|
from: http://www.scipy.org/
|
|
|
|
"""
|
|
import scipy
|
|
import matplotlib.pyplot as plt
|
|
|
|
# let this script know about the power monitor implementations
|
|
sys.path = [os.path.basename(__file__)] + sys.path
|
|
available_monitors = [
|
|
name
|
|
for _, name, _ in pkgutil.iter_modules(
|
|
[os.path.join(os.path.dirname(__file__), "power_monitors")])
|
|
if not name.startswith("_")]
|
|
|
|
APK = os.path.join(os.path.dirname(__file__), "..", "CtsVerifier.apk")
|
|
|
|
FLAGS = flags.FLAGS
|
|
|
|
# DELAY_SCREEN_OFF is the number of seconds to wait for baseline state
|
|
DELAY_SCREEN_OFF = 20.0
|
|
|
|
# whether to log data collected to a file for each sensor run:
|
|
LOG_DATA_TO_FILE = True
|
|
|
|
logging.getLogger().setLevel(logging.ERROR)
|
|
|
|
|
|
def do_import(name):
|
|
"""import a module by name dynamically"""
|
|
mod = __import__(name)
|
|
components = name.split(".")
|
|
for comp in components[1:]:
|
|
mod = getattr(mod, comp)
|
|
return mod
|
|
|
|
class PowerTestException(Exception):
|
|
"""
|
|
Definition of specialized Exception class for CTS power tests
|
|
"""
|
|
def __init__(self, message):
|
|
self._error_message = message
|
|
def __str__(self):
|
|
return self._error_message
|
|
|
|
class PowerTest:
|
|
"""Class to run a suite of power tests. This has methods for obtaining
|
|
measurements from the power monitor (through the driver) and then
|
|
processing it to determine baseline and AP suspend state and
|
|
measure ampere draw of various sensors.
|
|
Ctrl+C causes a keyboard interrupt exception which terminates the test."""
|
|
|
|
# Thresholds for max allowed power usage per sensor tested
|
|
# TODO: Accel, Mag and Gyro have no maximum power specified in the CDD;
|
|
# the following numbers are bogus and will be replaced soon by what
|
|
# the device reports (from Sensor.getPower())
|
|
MAX_ACCEL_AMPS = 0.08 # Amps
|
|
MAX_MAG_AMPS = 0.08 # Amps
|
|
MAX_GYRO_AMPS = 0.08 # Amps
|
|
MAX_SIGMO_AMPS = 0.08 # Amps
|
|
|
|
# TODO: The following numbers for step counter, etc must be replaced by
|
|
# the numbers specified in CDD for low-power sensors. The expected current
|
|
# draw must be computed from the specified power and the voltage used to
|
|
# power the device (specified from a config file).
|
|
MAX_STEP_COUNTER_AMPS = 0.08 # Amps
|
|
MAX_STEP_DETECTOR_AMPS = 0.08 # Amps
|
|
# The variable EXPECTED_AMPS_VARIATION_HALF_RANGE denotes the expected
|
|
# variation of the ampere measurements
|
|
# around the mean value at baseline state. i.e. we expect most of the
|
|
# ampere measurements at baseline state to vary around the mean by
|
|
# between +/- of the number below
|
|
EXPECTED_AMPS_VARIATION_HALF_RANGE = 0.0005
|
|
# The variable THRESHOLD_BASELINE_SAMPLES_FRACTION denotes the minimum fraction of samples that must
|
|
# be in the range of variation defined by EXPECTED_AMPS_VARIATION_HALF_RANGE
|
|
# around the mean baseline for us to decide that the phone has settled into
|
|
# its baseline state
|
|
THRESHOLD_BASELINE_SAMPLES_FRACTION = 0.86
|
|
# The variable MAX_PERCENTILE_AP_SCREEN_OFF_AMPS denotes the maximum ampere
|
|
# draw that the device can consume when it has gone to suspend state with
|
|
# one or more sensors registered and batching samples (screen and AP are
|
|
# off in this case)
|
|
MAX_PERCENTILE_AP_SCREEN_OFF_AMPS = 0.030 # Amps
|
|
# The variable PERCENTILE_MAX_AP_SCREEN_OFF denotes the fraction of ampere
|
|
# measurements that must be below the specified maximum amperes
|
|
# MAX_PERCENTILE_AP_SCREEN_OFF_AMPS for us to decide that the phone has
|
|
# reached suspend state.
|
|
PERCENTILE_MAX_AP_SCREEN_OFF = 0.95
|
|
DOMAIN_NAME = "/android/cts/powertest"
|
|
# SAMPLE_COUNT_NOMINAL denotes the typical number of measurements of amperes
|
|
# to collect from the power monitor
|
|
SAMPLE_COUNT_NOMINAL = 1000
|
|
# RATE_NOMINAL denotes the nominal frequency at which ampere measurements
|
|
# are taken from the monsoon power monitor
|
|
RATE_NOMINAL = 100
|
|
ENABLE_PLOTTING = False
|
|
|
|
REQUEST_EXTERNAL_STORAGE = "EXTERNAL STORAGE?"
|
|
REQUEST_EXIT = "EXIT"
|
|
REQUEST_RAISE = "RAISE %s %s"
|
|
REQUEST_USER_RESPONSE = "USER RESPONSE %s"
|
|
REQUEST_SET_TEST_RESULT = "SET TEST RESULT %s %s %s"
|
|
REQUEST_SENSOR_SWITCH = "SENSOR %s %s"
|
|
REQUEST_SENSOR_AVAILABILITY = "SENSOR? %s"
|
|
REQUEST_SCREEN_OFF = "SCREEN OFF"
|
|
REQUEST_SHOW_MESSAGE = "MESSAGE %s"
|
|
|
|
NEGATIVE_AMPERE_ERROR_MESSAGE = (
|
|
"Negative ampere draw measured, possibly due to power "
|
|
"supply from USB cable. Check the setup of device and power "
|
|
"monitor to make sure that the device is not connected "
|
|
"to machine via USB directly. The device should be "
|
|
"connected to the USB slot in the power monitor. It is okay "
|
|
"to change the wiring when the test is in progress.")
|
|
|
|
|
|
def __init__(self, max_baseline_amps):
|
|
"""
|
|
Args:
|
|
max_baseline_amps: The maximum value of baseline amperes
|
|
that we expect the device to consume at baseline state.
|
|
This can be different between models of phones.
|
|
"""
|
|
power_monitors = do_import("power_monitors.%s" % FLAGS.power_monitor)
|
|
testid = time.strftime("%d_%m_%Y__%H__%M_%S")
|
|
self._power_monitor = power_monitors.Power_Monitor(log_file_id = testid)
|
|
self._tcp_connect_port = 0 # any available port
|
|
print ("Establishing connection to device...")
|
|
self.setUsbEnabled(True)
|
|
status = self._power_monitor.GetStatus()
|
|
self._native_hz = status["sampleRate"] * 1000
|
|
# the following describes power test being run (i.e on what sensor
|
|
# and what type of test. This is used for logging.
|
|
self._current_test = "None"
|
|
self._external_storage = self.executeOnDevice(PowerTest.REQUEST_EXTERNAL_STORAGE)
|
|
self._max_baseline_amps = max_baseline_amps
|
|
|
|
def __del__(self):
|
|
self.finalize()
|
|
|
|
def finalize(self):
|
|
"""To be called upon termination of host connection to device"""
|
|
if self._tcp_connect_port > 0:
|
|
# tell device side to exit connection loop, and remove the forwarding
|
|
# connection
|
|
self.executeOnDevice(PowerTest.REQUEST_EXIT, reportErrors = False)
|
|
self.executeLocal("adb forward --remove tcp:%d" % self._tcp_connect_port)
|
|
self._tcp_connect_port = 0
|
|
if self._power_monitor:
|
|
self._power_monitor.Close()
|
|
self._power_monitor = None
|
|
|
|
def _send(self, msg, report_errors = True):
|
|
"""Connect to the device, send the given command, and then disconnect"""
|
|
if self._tcp_connect_port == 0:
|
|
# on first attempt to send a command, connect to device via any open port number,
|
|
# forwarding that port to a local socket on the device via adb
|
|
logging.debug("Seeking port for communication...")
|
|
# discover an open port
|
|
dummysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
dummysocket.bind(("localhost", 0))
|
|
(_, self._tcp_connect_port) = dummysocket.getsockname()
|
|
dummysocket.close()
|
|
assert(self._tcp_connect_port > 0)
|
|
|
|
status = self.executeLocal("adb forward tcp:%d localabstract:%s" %
|
|
(self._tcp_connect_port, PowerTest.DOMAIN_NAME))
|
|
# If the status !=0, then the host machine is unable to
|
|
# forward requests to client over adb. Ending the test and logging error message
|
|
# to the console on the host.
|
|
self.endTestIfLostConnection(
|
|
status != 0,
|
|
"Unable to forward requests to client over adb")
|
|
logging.info("Forwarding requests over local port %d",
|
|
self._tcp_connect_port)
|
|
|
|
link = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
try:
|
|
logging.debug("Connecting to device...")
|
|
link.connect(("localhost", self._tcp_connect_port))
|
|
logging.debug("Connected.")
|
|
except socket.error as serr:
|
|
print "Socket connection error: ", serr
|
|
print "Finalizing and exiting the test"
|
|
self.endTestIfLostConnection(
|
|
report_errors,
|
|
"Unable to communicate with device: connection refused")
|
|
except:
|
|
print "Non socket-related exception at this block in _send(); re-raising now."
|
|
raise
|
|
logging.debug("Sending '%s'", msg)
|
|
link.sendall(msg)
|
|
logging.debug("Getting response...")
|
|
response = link.recv(4096)
|
|
logging.debug("Got response '%s'", response)
|
|
link.close()
|
|
return response
|
|
|
|
def queryDevice(self, query):
|
|
"""Post a yes/no query to the device, return True upon successful query, False otherwise"""
|
|
logging.info("Querying device with '%s'", query)
|
|
return self._send(query) == "OK"
|
|
|
|
# TODO: abstract device communication (and string commands) into its own class
|
|
def executeOnDevice(self, cmd, reportErrors = True):
|
|
"""Execute a (string) command on the remote device"""
|
|
return self._send(cmd, reportErrors)
|
|
|
|
def executeLocal(self, cmd, check_status = True):
|
|
"""execute a shell command locally (on the host)"""
|
|
from subprocess import call
|
|
status = call(cmd.split(" "))
|
|
if status != 0 and check_status:
|
|
logging.error("Failed to execute \"%s\"", cmd)
|
|
else:
|
|
logging.debug("Executed \"%s\"", cmd)
|
|
return status
|
|
|
|
def reportErrorRaiseExceptionIf(self, condition, msg):
|
|
"""Report an error condition to the device if condition is True.
|
|
Will raise an exception on the device if condition is True.
|
|
Args:
|
|
condition: If true, this reports error
|
|
msg: Message related to exception
|
|
Raises:
|
|
A PowerTestException encapsulating the message provided in msg
|
|
"""
|
|
if condition:
|
|
try:
|
|
logging.error("Exiting on error: %s" % msg)
|
|
self.executeOnDevice(PowerTest.REQUEST_RAISE % (self._current_test, msg),
|
|
reportErrors = True)
|
|
except:
|
|
logging.error("Unable to communicate with device to report "
|
|
"error: %s" % msg)
|
|
self.finalize()
|
|
sys.exit(msg)
|
|
raise PowerTestException(msg)
|
|
|
|
def endTestIfLostConnection(self, lost_connection, error_message):
|
|
"""
|
|
This function ends the test if lost_connection was true,
|
|
which indicates that the connection to the device was lost.
|
|
Args:
|
|
lost_connection: boolean variable, if True it indicates that
|
|
connection to device was lost and the test must be terminated.
|
|
error_message: String to print to the host console before exiting the test
|
|
(if lost_connection is True)
|
|
Returns:
|
|
None.
|
|
"""
|
|
if lost_connection:
|
|
logging.error(error_message)
|
|
self.finalize()
|
|
sys.exit(error_message)
|
|
|
|
def setUsbEnabled(self, enabled, verbose = True):
|
|
if enabled:
|
|
val = 1
|
|
else:
|
|
val = 0
|
|
self._power_monitor.SetUsbPassthrough(val)
|
|
tries = 0
|
|
|
|
# Sometimes command won't go through first time, particularly if immediately after a data
|
|
# collection, so allow for retries
|
|
# TODO: Move this retry mechanism to the power monitor driver.
|
|
status = self._power_monitor.GetStatus()
|
|
while status is None and tries < 5:
|
|
tries += 1
|
|
time.sleep(2.0)
|
|
logging.error("Retrying get status call...")
|
|
self._power_monitor.StopDataCollection()
|
|
self._power_monitor.SetUsbPassthrough(val)
|
|
status = self._power_monitor.GetStatus()
|
|
|
|
if enabled:
|
|
if verbose:
|
|
print("...USB enabled, waiting for device")
|
|
self.executeLocal("adb wait-for-device")
|
|
if verbose:
|
|
print("...device online")
|
|
else:
|
|
if verbose:
|
|
logging.info("...USB disabled")
|
|
# re-establish port forwarding
|
|
if enabled and self._tcp_connect_port > 0:
|
|
status = self.executeLocal("adb forward tcp:%d localabstract:%s" %
|
|
(self._tcp_connect_port, PowerTest.DOMAIN_NAME))
|
|
self.reportErrorRaiseExceptionIf(status != 0, msg = "Unable to forward requests to client over adb")
|
|
|
|
def computeBaselineState(self, measurements):
|
|
"""
|
|
Args:
|
|
measurements: List of floats containing ampere draw measurements
|
|
taken from the monsoon power monitor.
|
|
Must be atleast 100 measurements long
|
|
Returns:
|
|
A tuple (isBaseline, mean_current) where isBaseline is a
|
|
boolean that is True only if the baseline state for the phone is
|
|
detected. mean_current is an estimate of the average baseline
|
|
current for the device, which is valid only if baseline state is
|
|
detected (if not, it is set to -1).
|
|
"""
|
|
|
|
# Looks at the measurements to see if it is in baseline state
|
|
if len(measurements) < 100:
|
|
print(
|
|
"Need at least 100 measurements to determine if baseline state has"
|
|
" been reached")
|
|
return (False, -1)
|
|
|
|
# Assumption: At baseline state, the power profile is Gaussian distributed
|
|
# with low-variance around the mean current draw.
|
|
# Ideally we should find the mode from a histogram bin to find an estimated mean.
|
|
# Assuming here that the median is very close to this value; later we check that the
|
|
# variance of the samples is low enough to validate baseline.
|
|
sorted_measurements = sorted(measurements)
|
|
number_measurements = len(measurements)
|
|
if not number_measurements % 2:
|
|
median_measurement = (sorted_measurements[(number_measurements - 1) / 2] +
|
|
sorted_measurements[(number_measurements + 1) / 2]) / 2
|
|
else:
|
|
median_measurement = sorted_measurements[number_measurements / 2]
|
|
|
|
# Assume that at baseline state, a large fraction of power measurements
|
|
# are within +/- EXPECTED_AMPS_VARIATION_HALF_RANGE milliAmperes of
|
|
# the average baseline current. Find all such measurements in the
|
|
# sorted measurement vector.
|
|
left_index = (
|
|
bisect_left(
|
|
sorted_measurements,
|
|
median_measurement -
|
|
PowerTest.EXPECTED_AMPS_VARIATION_HALF_RANGE))
|
|
right_index = (
|
|
bisect_left(
|
|
sorted_measurements,
|
|
median_measurement +
|
|
PowerTest.EXPECTED_AMPS_VARIATION_HALF_RANGE))
|
|
|
|
average_baseline_amps = scipy.mean(
|
|
sorted_measurements[left_index: (right_index - 1)])
|
|
|
|
detected_baseline = True
|
|
# We enforce that a fraction of more than 'THRESHOLD_BASELINE_SAMPLES_FRACTION'
|
|
# of samples must be within +/- EXPECTED_AMPS_VARIATION_HALF_RANGE
|
|
# milliAmperes of the mean baseline current, which we have estimated as
|
|
# the median.
|
|
if ((right_index - left_index) < PowerTest.THRESHOLD_BASELINE_SAMPLES_FRACTION * len(
|
|
measurements)):
|
|
detected_baseline = False
|
|
|
|
# We check for the maximum limit of the expected baseline
|
|
if median_measurement > self._max_baseline_amps:
|
|
detected_baseline = False
|
|
if average_baseline_amps < 0:
|
|
print PowerTest.NEGATIVE_AMPERE_ERROR_MESSAGE
|
|
detected_baseline = False
|
|
|
|
print("%s baseline state" % ("Could detect" if detected_baseline else "Could NOT detect"))
|
|
print(
|
|
"median amps = %f, avg amps = %f, fraction of good samples = %f" %
|
|
(median_measurement, average_baseline_amps,
|
|
float(right_index - left_index) / len(measurements)))
|
|
if PowerTest.ENABLE_PLOTTING:
|
|
plt.plot(measurements)
|
|
plt.show()
|
|
print("To continue test, please close the plot window manually.")
|
|
return (detected_baseline, average_baseline_amps)
|
|
|
|
def isApInSuspendState(self, measurements_amps, nominal_max_amps, test_percentile):
|
|
"""
|
|
This function detects AP suspend and display off state of phone
|
|
after a sensor has been registered.
|
|
|
|
Because the power profile can be very different between sensors and
|
|
even across builds, it is difficult to specify a tight threshold for
|
|
mean current draw or mandate that the power measurements must have low
|
|
variance. We use a criteria that allows for a certain fraction of
|
|
peaks in power spectrum and checks that test_percentile fraction of
|
|
measurements must be below the specified value nominal_max_amps
|
|
Args:
|
|
measurements_amps: amperes draw measurements from power monitor
|
|
test_percentile: the fraction of measurements we require to be below
|
|
a specified amps value
|
|
nominal_max_amps: the specified value of the max current draw
|
|
Returns:
|
|
returns a boolean which is True if and only if the AP suspend and
|
|
display off state is detected
|
|
"""
|
|
count_good = len([m for m in measurements_amps if m < nominal_max_amps])
|
|
count_negative = len([m for m in measurements_amps if m < 0])
|
|
if count_negative > 0:
|
|
print PowerTest.NEGATIVE_AMPERE_ERROR_MESSAGE
|
|
return False;
|
|
return count_good > test_percentile * len(measurements_amps)
|
|
|
|
def getBaselineState(self):
|
|
"""This function first disables all sensors, then collects measurements
|
|
through the power monitor and continuously evaluates if baseline state
|
|
is reached. Once baseline state is detected, it returns a tuple with
|
|
status information. If baseline is not detected in a preset maximum
|
|
number of trials, it returns as well.
|
|
|
|
Returns:
|
|
Returns a tuple (isBaseline, mean_current) where isBaseline is a
|
|
boolean that is True only if the baseline state for the phone is
|
|
detected. mean_current is an estimate of the average baseline current
|
|
for the device, which is valid only if baseline state is detected
|
|
(if not, it is set to -1)
|
|
"""
|
|
self.setPowerOn("ALL", False)
|
|
self.setUsbEnabled(False)
|
|
print("Waiting %d seconds for baseline state" % DELAY_SCREEN_OFF)
|
|
time.sleep(DELAY_SCREEN_OFF)
|
|
|
|
MEASUREMENT_DURATION_SECONDS_BASELINE_DETECTION = 5 # seconds
|
|
NUMBER_MEASUREMENTS_BASELINE_DETECTION = (
|
|
PowerTest.RATE_NOMINAL *
|
|
MEASUREMENT_DURATION_SECONDS_BASELINE_DETECTION)
|
|
NUMBER_MEASUREMENTS_BASELINE_VERIFICATION = (
|
|
NUMBER_MEASUREMENTS_BASELINE_DETECTION * 5)
|
|
MAX_TRIALS = 50
|
|
|
|
collected_baseline_measurements = False
|
|
|
|
for tries in xrange(MAX_TRIALS):
|
|
print("Trial number %d of %d..." % (tries, MAX_TRIALS))
|
|
measurements = self.collectMeasurements(
|
|
NUMBER_MEASUREMENTS_BASELINE_DETECTION, PowerTest.RATE_NOMINAL,
|
|
verbose = False)
|
|
if self.computeBaselineState(measurements)[0] is True:
|
|
collected_baseline_measurements = True
|
|
break
|
|
|
|
if collected_baseline_measurements:
|
|
print("Verifying baseline state over a longer interval "
|
|
"in order to double check baseline state")
|
|
measurements = self.collectMeasurements(
|
|
NUMBER_MEASUREMENTS_BASELINE_VERIFICATION, PowerTest.RATE_NOMINAL,
|
|
verbose = False)
|
|
self.reportErrorRaiseExceptionIf(
|
|
not measurements, "No background measurements could be taken")
|
|
retval = self.computeBaselineState(measurements)
|
|
if retval[0]:
|
|
print("Verified baseline.")
|
|
if measurements and LOG_DATA_TO_FILE:
|
|
with open("/tmp/cts-power-tests-background-data.log", "w") as f:
|
|
for m in measurements:
|
|
f.write("%.4f\n" % m)
|
|
return retval
|
|
else:
|
|
return (False, -1)
|
|
|
|
def waitForApSuspendMode(self):
|
|
"""This function repeatedly collects measurements until AP suspend and display off
|
|
mode is detected. After a maximum number of trials, if this state is not reached, it
|
|
raises an error.
|
|
Returns:
|
|
boolean which is True if device was detected to be in suspend state
|
|
Raises:
|
|
Power monitor-related exception
|
|
"""
|
|
print("waitForApSuspendMode(): Sleeping for %d seconds" % DELAY_SCREEN_OFF)
|
|
time.sleep(DELAY_SCREEN_OFF)
|
|
|
|
NUMBER_MEASUREMENTS = 200
|
|
# Maximum trials for which to collect measurements to get to Ap suspend
|
|
# state
|
|
MAX_TRIALS = 50
|
|
|
|
got_to_suspend_state = False
|
|
for count in xrange(MAX_TRIALS):
|
|
print ("waitForApSuspendMode(): Trial %d of %d" % (count, MAX_TRIALS))
|
|
measurements = self.collectMeasurements(NUMBER_MEASUREMENTS,
|
|
PowerTest.RATE_NOMINAL,
|
|
verbose = False)
|
|
if self.isApInSuspendState(
|
|
measurements, PowerTest.MAX_PERCENTILE_AP_SCREEN_OFF_AMPS,
|
|
PowerTest.PERCENTILE_MAX_AP_SCREEN_OFF):
|
|
got_to_suspend_state = True
|
|
break
|
|
self.reportErrorRaiseExceptionIf(
|
|
got_to_suspend_state is False,
|
|
msg = "Unable to determine application processor suspend mode status.")
|
|
print("Got to AP suspend state")
|
|
return got_to_suspend_state
|
|
|
|
def collectMeasurements(self, measurementCount, rate, verbose = True):
|
|
"""Args:
|
|
measurementCount: Number of measurements to collect from the power
|
|
monitor
|
|
rate: The integer frequency in Hertz at which to collect measurements from
|
|
the power monitor
|
|
Returns:
|
|
A list containing measurements from the power monitor; that has the
|
|
requested count of the number of measurements at the specified rate
|
|
"""
|
|
assert (measurementCount > 0)
|
|
decimate_by = self._native_hz / rate or 1
|
|
|
|
self._power_monitor.StartDataCollection()
|
|
sub_measurements = []
|
|
measurements = []
|
|
tries = 0
|
|
if verbose: print("")
|
|
try:
|
|
while len(measurements) < measurementCount and tries < 5:
|
|
if tries:
|
|
self._power_monitor.StopDataCollection()
|
|
self._power_monitor.StartDataCollection()
|
|
time.sleep(1.0)
|
|
tries += 1
|
|
additional = self._power_monitor.CollectData()
|
|
if additional is not None:
|
|
tries = 0
|
|
sub_measurements.extend(additional)
|
|
while len(sub_measurements) >= decimate_by:
|
|
sub_avg = sum(sub_measurements[0:decimate_by]) / decimate_by
|
|
measurements.append(sub_avg)
|
|
sub_measurements = sub_measurements[decimate_by:]
|
|
if verbose:
|
|
# "\33[1A\33[2K" is a special Linux console control
|
|
# sequence for moving to the previous line, and
|
|
# erasing it; and reprinting new text on that
|
|
# erased line.
|
|
sys.stdout.write("\33[1A\33[2K")
|
|
print ("MEASURED[%d]: %f" % (len(measurements), measurements[-1]))
|
|
finally:
|
|
self._power_monitor.StopDataCollection()
|
|
|
|
self.reportErrorRaiseExceptionIf(measurementCount > len(measurements),
|
|
"Unable to collect all requested measurements")
|
|
return measurements
|
|
|
|
def requestUserAcknowledgment(self, msg):
|
|
"""Post message to user on screen and wait for acknowledgment"""
|
|
response = self.executeOnDevice(PowerTest.REQUEST_USER_RESPONSE % msg)
|
|
self.reportErrorRaiseExceptionIf(
|
|
response != "OK", "Unable to request user acknowledgment")
|
|
|
|
def setTestResult(self, test_name, test_result, test_message):
|
|
"""
|
|
Reports the result of a test to the device
|
|
Args:
|
|
test_name: name of the test
|
|
test_result: Boolean result of the test (True means Pass)
|
|
test_message: Relevant message
|
|
"""
|
|
print ("Test %s : %s" % (test_name, test_result))
|
|
|
|
response = (
|
|
self.executeOnDevice(
|
|
PowerTest.REQUEST_SET_TEST_RESULT %
|
|
(test_name, test_result, test_message)))
|
|
self.reportErrorRaiseExceptionIf(
|
|
response != "OK", "Unable to send test status to Verifier")
|
|
|
|
def setPowerOn(self, sensor, powered_on):
|
|
response = self.executeOnDevice(PowerTest.REQUEST_SENSOR_SWITCH %
|
|
(("ON" if powered_on else "OFF"), sensor))
|
|
self.reportErrorRaiseExceptionIf(
|
|
response == "ERR", "Unable to set sensor %s state" % sensor)
|
|
logging.info("Set %s %s", sensor, ("ON" if powered_on else "OFF"))
|
|
return response
|
|
|
|
def runSensorPowerTest(
|
|
self, sensor, max_amperes_allowed, baseline_amps, user_request = None):
|
|
"""
|
|
Runs power test for a specific sensor; i.e. measures the amperes draw
|
|
of the phone using monsoon, with the specified sensor mregistered
|
|
and the phone in suspend state; and verifies that the incremental
|
|
consumed amperes is within expected bounds.
|
|
Args:
|
|
sensor: The specified sensor for which to run the power test
|
|
max_amperes_allowed: Maximum ampere draw of the device with the
|
|
sensor registered and device in suspend state
|
|
baseline_amps: The power draw of the device when it is in baseline
|
|
state (no sensors registered, display off, AP asleep)
|
|
"""
|
|
self._current_test = ("%s_Power_Test_While_%s" % (
|
|
sensor, ("Under_Motion" if user_request is not None else "Still")))
|
|
try:
|
|
print ("\n\n---------------------------------")
|
|
if user_request is not None:
|
|
print ("Running power test on %s under motion." % sensor)
|
|
else:
|
|
print ("Running power test on %s while device is still." % sensor)
|
|
print ("---------------------------------")
|
|
response = self.executeOnDevice(
|
|
PowerTest.REQUEST_SENSOR_AVAILABILITY % sensor)
|
|
if response == "UNAVAILABLE":
|
|
self.setTestResult(
|
|
self._current_test, test_result = "SKIPPED",
|
|
test_message = "Sensor %s not available on this platform" % sensor)
|
|
self.setPowerOn("ALL", False)
|
|
if response == "UNAVAILABLE":
|
|
self.setTestResult(
|
|
self._current_test, test_result = "SKIPPED",
|
|
test_message = "Sensor %s not available on this device" % sensor)
|
|
return
|
|
self.reportErrorRaiseExceptionIf(response != "OK", "Unable to set all sensor off")
|
|
self.executeOnDevice(PowerTest.REQUEST_SCREEN_OFF)
|
|
self.setUsbEnabled(False)
|
|
self.setUsbEnabled(True)
|
|
self.setPowerOn(sensor, True)
|
|
if user_request is not None:
|
|
print("===========================================\n" +
|
|
"==> Please follow the instructions presented on the device\n" +
|
|
"===========================================")
|
|
self.requestUserAcknowledgment(user_request)
|
|
self.executeOnDevice(PowerTest.REQUEST_SCREEN_OFF)
|
|
self.setUsbEnabled(False)
|
|
self.reportErrorRaiseExceptionIf(
|
|
response != "OK", "Unable to set sensor %s ON" % sensor)
|
|
|
|
self.waitForApSuspendMode()
|
|
print ("Collecting sensor %s measurements" % sensor)
|
|
measurements = self.collectMeasurements(PowerTest.SAMPLE_COUNT_NOMINAL,
|
|
PowerTest.RATE_NOMINAL)
|
|
|
|
if measurements and LOG_DATA_TO_FILE:
|
|
with open("/tmp/cts-power-tests-%s-%s-sensor-data.log" % (sensor,
|
|
("Under_Motion" if user_request is not None else "Still")), "w") as f:
|
|
for m in measurements:
|
|
f.write("%.4f\n" % m)
|
|
self.setUsbEnabled(True, verbose = False)
|
|
print("Saving raw data files to device...")
|
|
self.executeLocal("adb shell mkdir -p %s" % self._external_storage, False)
|
|
self.executeLocal("adb push %s %s/." % (f.name, self._external_storage))
|
|
self.setUsbEnabled(False, verbose = False)
|
|
self.reportErrorRaiseExceptionIf(
|
|
not measurements, "No measurements could be taken for %s" % sensor)
|
|
avg = sum(measurements) / len(measurements)
|
|
squared = [(m - avg) * (m - avg) for m in measurements]
|
|
|
|
stddev = math.sqrt(sum(squared) / len(squared))
|
|
current_diff = avg - baseline_amps
|
|
self.setUsbEnabled(True)
|
|
max_power = max(measurements) - avg
|
|
if current_diff <= max_amperes_allowed:
|
|
# TODO: fail the test of background > current
|
|
message = (
|
|
"Draw is within limits. Sensor delta:%f mAmp Baseline:%f "
|
|
"mAmp Sensor: %f mAmp Stddev : %f mAmp Peak: %f mAmp") % (
|
|
current_diff * 1000.0, baseline_amps * 1000.0, avg * 1000.0,
|
|
stddev * 1000.0, max_power * 1000.0)
|
|
else:
|
|
message = (
|
|
"Draw is too high. Current:%f Background:%f Measured: %f "
|
|
"Stddev: %f Peak: %f") % (
|
|
current_diff * 1000.0, baseline_amps * 1000.0, avg * 1000.0,
|
|
stddev * 1000.0, max_power * 1000.0)
|
|
self.setTestResult(
|
|
self._current_test,
|
|
("PASS" if (current_diff <= max_amperes_allowed) else "FAIL"),
|
|
message)
|
|
print("Result: " + message)
|
|
except:
|
|
traceback.print_exc()
|
|
self.setTestResult(self._current_test, test_result = "FAIL",
|
|
test_message = "Exception occurred during run of test.")
|
|
raise
|
|
|
|
@staticmethod
|
|
def runTests(max_baseline_amps):
|
|
testrunner = None
|
|
try:
|
|
GENERIC_MOTION_REQUEST = ("\n===> Please press Next and when the "
|
|
"screen is off, keep the device under motion with only tiny, "
|
|
"slow movements until the screen turns on again.\nPlease "
|
|
"refrain from interacting with the screen or pressing any side "
|
|
"buttons while measurements are taken.")
|
|
USER_STEPS_REQUEST = ("\n===> Please press Next and when the "
|
|
"screen is off, then move the device to simulate step motion "
|
|
"until the screen turns on again.\nPlease refrain from "
|
|
"interacting with the screen or pressing any side buttons "
|
|
"while measurements are taken.")
|
|
testrunner = PowerTest(max_baseline_amps)
|
|
testrunner.executeOnDevice(
|
|
PowerTest.REQUEST_SHOW_MESSAGE % "Connected. Running tests...")
|
|
is_baseline_success, baseline_amps = testrunner.getBaselineState()
|
|
|
|
if is_baseline_success:
|
|
testrunner.setUsbEnabled(True)
|
|
# TODO: Enable testing a single sensor
|
|
testrunner.runSensorPowerTest(
|
|
"SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_AMPS, baseline_amps,
|
|
user_request = GENERIC_MOTION_REQUEST)
|
|
testrunner.runSensorPowerTest(
|
|
"STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_AMPS, baseline_amps,
|
|
user_request = USER_STEPS_REQUEST)
|
|
testrunner.runSensorPowerTest(
|
|
"STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_AMPS, baseline_amps,
|
|
user_request = USER_STEPS_REQUEST)
|
|
testrunner.runSensorPowerTest(
|
|
"ACCELEROMETER", PowerTest.MAX_ACCEL_AMPS, baseline_amps,
|
|
user_request = GENERIC_MOTION_REQUEST)
|
|
testrunner.runSensorPowerTest(
|
|
"MAGNETIC_FIELD", PowerTest.MAX_MAG_AMPS, baseline_amps,
|
|
user_request = GENERIC_MOTION_REQUEST)
|
|
testrunner.runSensorPowerTest(
|
|
"GYROSCOPE", PowerTest.MAX_GYRO_AMPS, baseline_amps,
|
|
user_request = GENERIC_MOTION_REQUEST)
|
|
testrunner.runSensorPowerTest(
|
|
"ACCELEROMETER", PowerTest.MAX_ACCEL_AMPS, baseline_amps,
|
|
user_request = None)
|
|
testrunner.runSensorPowerTest(
|
|
"MAGNETIC_FIELD", PowerTest.MAX_MAG_AMPS, baseline_amps,
|
|
user_request = None)
|
|
testrunner.runSensorPowerTest(
|
|
"GYROSCOPE", PowerTest.MAX_GYRO_AMPS, baseline_amps,
|
|
user_request = None)
|
|
testrunner.runSensorPowerTest(
|
|
"SIGNIFICANT_MOTION", PowerTest.MAX_SIGMO_AMPS, baseline_amps,
|
|
user_request = None)
|
|
testrunner.runSensorPowerTest(
|
|
"STEP_DETECTOR", PowerTest.MAX_STEP_DETECTOR_AMPS, baseline_amps,
|
|
user_request = None)
|
|
testrunner.runSensorPowerTest(
|
|
"STEP_COUNTER", PowerTest.MAX_STEP_COUNTER_AMPS, baseline_amps,
|
|
user_request = None)
|
|
else:
|
|
print("Could not get to baseline state. This is either because "
|
|
"in several trials, the monitor could not measure a set "
|
|
"of power measurements that had the specified low "
|
|
"variance or the mean measurements were below the "
|
|
"expected value. None of the sensor power measurement "
|
|
" tests were performed due to not being able to detect "
|
|
"baseline state. Please re-run the power tests.")
|
|
except KeyboardInterrupt:
|
|
print "Keyboard interrupt from user."
|
|
raise
|
|
except:
|
|
import traceback
|
|
traceback.print_exc()
|
|
finally:
|
|
logging.info("TESTS COMPLETE")
|
|
if testrunner:
|
|
try:
|
|
testrunner.finalize()
|
|
except socket.error:
|
|
sys.exit(
|
|
"===================================================\n"
|
|
"Unable to connect to device under test. Make sure \n"
|
|
"the device is connected via the usb pass-through, \n"
|
|
"the CtsVerifier app is running the SensorPowerTest on \n"
|
|
"the device, and USB pass-through is enabled.\n"
|
|
"===================================================")
|
|
|
|
def main(argv):
|
|
""" Simple command-line interface for a power test application."""
|
|
useful_flags = ["voltage", "status", "usbpassthrough",
|
|
"samples", "current", "log", "power_monitor"]
|
|
if not [f for f in useful_flags if FLAGS.get(f, None) is not None]:
|
|
print __doc__.strip()
|
|
print FLAGS.MainModuleHelp()
|
|
return
|
|
|
|
if FLAGS.avg and FLAGS.avg < 0:
|
|
logging.error("--avg must be greater than 0")
|
|
return
|
|
|
|
if FLAGS.voltage is not None:
|
|
if FLAGS.voltage > 5.5:
|
|
print("!!WARNING: Voltage higher than typical values!!!")
|
|
try:
|
|
response = raw_input(
|
|
"Voltage of %.3f requested. Confirm this is correct (Y/N)" %
|
|
FLAGS.voltage)
|
|
if response.upper() != "Y":
|
|
sys.exit("Aborting")
|
|
except:
|
|
sys.exit("Aborting.")
|
|
|
|
if not FLAGS.power_monitor:
|
|
sys.exit(
|
|
"You must specify a '--power_monitor' option to specify which power "
|
|
"monitor type " +
|
|
"you are using.\nOne of:\n \n ".join(available_monitors))
|
|
power_monitors = do_import("power_monitors.%s" % FLAGS.power_monitor)
|
|
try:
|
|
mon = power_monitors.Power_Monitor(device = FLAGS.device)
|
|
except:
|
|
import traceback
|
|
|
|
traceback.print_exc()
|
|
sys.exit("No power monitors found")
|
|
|
|
if FLAGS.voltage is not None:
|
|
|
|
if FLAGS.ramp is not None:
|
|
mon.RampVoltage(mon.start_voltage, FLAGS.voltage)
|
|
else:
|
|
mon.SetVoltage(FLAGS.voltage)
|
|
|
|
if FLAGS.current is not None:
|
|
mon.SetMaxCurrent(FLAGS.current)
|
|
|
|
if FLAGS.status:
|
|
items = sorted(mon.GetStatus().items())
|
|
print "\n".join(["%s: %s" % item for item in items])
|
|
|
|
if FLAGS.usbpassthrough:
|
|
if FLAGS.usbpassthrough == "off":
|
|
mon.SetUsbPassthrough(0)
|
|
elif FLAGS.usbpassthrough == "on":
|
|
mon.SetUsbPassthrough(1)
|
|
elif FLAGS.usbpassthrough == "auto":
|
|
mon.SetUsbPassthrough(2)
|
|
else:
|
|
mon.Close()
|
|
sys.exit("bad pass-through flag: %s" % FLAGS.usbpassthrough)
|
|
|
|
if FLAGS.samples:
|
|
# Make sure state is normal
|
|
mon.StopDataCollection()
|
|
status = mon.GetStatus()
|
|
native_hz = status["sampleRate"] * 1000
|
|
|
|
# Collect and average samples as specified
|
|
mon.StartDataCollection()
|
|
|
|
# In case FLAGS.hz doesn't divide native_hz exactly, use this invariant:
|
|
# 'offset' = (consumed samples) * FLAGS.hz - (emitted samples) * native_hz
|
|
# This is the error accumulator in a variation of Bresenham's algorithm.
|
|
emitted = offset = 0
|
|
collected = []
|
|
history_deque = collections.deque() # past n samples for rolling average
|
|
|
|
# TODO: Complicated lines of code below. Refactoring needed
|
|
try:
|
|
last_flush = time.time()
|
|
while emitted < FLAGS.samples or FLAGS.samples == -1:
|
|
# The number of raw samples to consume before emitting the next output
|
|
need = (native_hz - offset + FLAGS.hz - 1) / FLAGS.hz
|
|
if need > len(collected): # still need more input samples
|
|
samples = mon.CollectData()
|
|
if not samples: break
|
|
collected.extend(samples)
|
|
else:
|
|
# Have enough data, generate output samples.
|
|
# Adjust for consuming 'need' input samples.
|
|
offset += need * FLAGS.hz
|
|
while offset >= native_hz: # maybe multiple, if FLAGS.hz > native_hz
|
|
this_sample = sum(collected[:need]) / need
|
|
|
|
if FLAGS.timestamp: print int(time.time()),
|
|
|
|
if FLAGS.avg:
|
|
history_deque.appendleft(this_sample)
|
|
if len(history_deque) > FLAGS.avg: history_deque.pop()
|
|
print "%f %f" % (this_sample,
|
|
sum(history_deque) / len(history_deque))
|
|
else:
|
|
print "%f" % this_sample
|
|
sys.stdout.flush()
|
|
|
|
offset -= native_hz
|
|
emitted += 1 # adjust for emitting 1 output sample
|
|
collected = collected[need:]
|
|
now = time.time()
|
|
if now - last_flush >= 0.99: # flush every second
|
|
sys.stdout.flush()
|
|
last_flush = now
|
|
except KeyboardInterrupt:
|
|
print("interrupted")
|
|
return 1
|
|
finally:
|
|
mon.Close()
|
|
return 0
|
|
|
|
if FLAGS.run:
|
|
if not FLAGS.power_monitor:
|
|
sys.exit(
|
|
"When running power tests, you must specify which type of power "
|
|
"monitor to use" +
|
|
" with '--power_monitor <type of power monitor>'")
|
|
try:
|
|
PowerTest.runTests(FLAGS.max_baseline_amps)
|
|
except KeyboardInterrupt:
|
|
print "Keyboard interrupt from user"
|
|
|
|
if __name__ == "__main__":
|
|
flags.DEFINE_boolean("status", None, "Print power meter status")
|
|
flags.DEFINE_integer("avg", None,
|
|
"Also report average over last n data points")
|
|
flags.DEFINE_float("voltage", None, "Set output voltage (0 for off)")
|
|
flags.DEFINE_float("current", None, "Set max output current")
|
|
flags.DEFINE_string("usbpassthrough", None, "USB control (on, off, auto)")
|
|
flags.DEFINE_integer("samples", None, "Collect and print this many samples")
|
|
flags.DEFINE_integer("hz", 5000, "Print this many samples/sec")
|
|
flags.DEFINE_string("device", None,
|
|
"Path to the device in /dev/... (ex:/dev/ttyACM1)")
|
|
flags.DEFINE_boolean("timestamp", None,
|
|
"Also print integer (seconds) timestamp on each line")
|
|
flags.DEFINE_boolean("ramp", True, "Gradually increase voltage")
|
|
flags.DEFINE_boolean("log", False, "Log progress to a file or not")
|
|
flags.DEFINE_boolean("run", False, "Run the test suite for power")
|
|
flags.DEFINE_string("power_monitor", None, "Type of power monitor to use")
|
|
flags.DEFINE_float("max_baseline_amps", 0.005,
|
|
"Set maximum baseline current for device being tested")
|
|
sys.exit(main(FLAGS(sys.argv)))
|