You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
201 lines
7.0 KiB
201 lines
7.0 KiB
# Copyright 2016 The Android Open Source Project
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import logging
|
|
import os
|
|
import os.path
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
|
|
import numpy as np
|
|
|
|
import run_all_tests # from same tools directory as run_sensor_fusion.py
|
|
|
|
_CORR_DIST_THRESH_MAX = 0.005 # must match value in test_sensor_fusion.py
|
|
_NUM_RUNS = 1
|
|
_TEST_BED_SENSOR_FUSION = 'TEST_BED_SENSOR_FUSION'
|
|
_TIME_SHIFT_MATCH = 'Best correlation of '
|
|
|
|
|
|
def find_time_shift(out_file_path):
|
|
"""Search through a test run's test_log.DEBUG for the best time shift.
|
|
|
|
Args:
|
|
out_file_path: File path for stdout logs to search through
|
|
|
|
Returns:
|
|
Float num of best time shift, if one is found. Otherwise, None.
|
|
"""
|
|
line = find_matching_line(out_file_path, _TIME_SHIFT_MATCH)
|
|
if line is None:
|
|
return None
|
|
else:
|
|
words = line.split(' ')
|
|
time_shift = float(words[-1][:-3]) # strip off 'ms'
|
|
fit_corr = float(words[-5])
|
|
return {'time_shift': time_shift, 'corr': fit_corr}
|
|
|
|
|
|
def find_matching_line(file_path, match_string):
|
|
"""Search each line in the file at 'file_path' for match_string.
|
|
|
|
Args:
|
|
file_path: File path for file being searched
|
|
match_string: Sting used to match against lines
|
|
|
|
Returns:
|
|
The first matching line. If none exists, returns None.
|
|
"""
|
|
with open(file_path) as f:
|
|
for line in f:
|
|
if match_string in line:
|
|
return line
|
|
return None
|
|
|
|
|
|
def main():
|
|
"""Run the sensor_fusion test for stastical purposes.
|
|
|
|
Script should be run from the top-level CameraITS directory.
|
|
All parameters expect 'num_runs' are defined in config.yml.
|
|
num_runs is defined at run time with 'num_runs=<int>'
|
|
'camera_id' can be over-written at command line to allow different
|
|
camera_ids facing the same direction to be tested.
|
|
|
|
ie. python tools/run_all_tests.py num_runs=10 # n=10 w/ config.yml cam
|
|
python tools/run_all_tests.py camera=0 num_runs=10 # n=10 w/ cam[0]
|
|
python tools/run_all_tests.py camera=0.4 num_runs=10 # n=10 w/ cam[0.4]
|
|
|
|
Command line arguments:
|
|
camera_id: camera_id or list of camera_ids.
|
|
num_runs: integer number of runs to get statistical values
|
|
|
|
All other config values are stored in config.yml file.
|
|
"""
|
|
logging.basicConfig(level=logging.INFO)
|
|
# Make output directories to hold the generated files.
|
|
topdir = tempfile.mkdtemp(prefix='CameraITS_')
|
|
subprocess.call(['chmod', 'g+rx', topdir])
|
|
|
|
camera_id_combos = []
|
|
|
|
# Override camera with cmd line values if available
|
|
num_runs = _NUM_RUNS
|
|
get_argv_vals = lambda x: x.split('=')[1]
|
|
for s in list(sys.argv[1:]):
|
|
if 'camera=' in s:
|
|
camera_id_combos = str(get_argv_vals(s)).split(',')
|
|
elif 'num_runs=' in s:
|
|
num_runs = int(get_argv_vals(s))
|
|
|
|
# Read config file and extract relevant TestBed
|
|
config_file_contents = run_all_tests.get_config_file_contents()
|
|
for i in config_file_contents['TestBeds']:
|
|
if i['Name'] != _TEST_BED_SENSOR_FUSION:
|
|
config_file_contents['TestBeds'].remove(i)
|
|
|
|
# Get test parameters from config file
|
|
test_params_content = run_all_tests.get_test_params(config_file_contents)
|
|
if not camera_id_combos:
|
|
camera_id_combos = test_params_content['camera'].split(',')
|
|
debug = test_params_content['debug_mode']
|
|
fps = test_params_content['fps']
|
|
img_size = test_params_content['img_size']
|
|
|
|
# Get dut id
|
|
device_id = run_all_tests.get_device_serial_number(
|
|
'dut', config_file_contents)
|
|
|
|
# Log run info
|
|
logging.info('Running sensor_fusion on device: %s, camera: %s',
|
|
device_id, camera_id_combos)
|
|
logging.info('Saving output files to: %s', topdir)
|
|
|
|
for camera_id in camera_id_combos:
|
|
time_shifts = []
|
|
# A subdir in topdir will be created for each camera_id.
|
|
test_params_content['camera'] = camera_id
|
|
test_params_content['scene'] = 'sensor_fusion'
|
|
config_file_contents['TestBeds'][0]['TestParams'] = test_params_content
|
|
os.mkdir(os.path.join(topdir, camera_id))
|
|
|
|
# Add the MoblyParams to config.yml file store camera_id test results.
|
|
mobly_output_logs_path = os.path.join(topdir, camera_id)
|
|
mobly_scene_output_logs_path = os.path.join(
|
|
mobly_output_logs_path, 'sensor_fusion')
|
|
mobly_params_dict = {
|
|
'MoblyParams': {
|
|
'LogPath': mobly_scene_output_logs_path
|
|
}
|
|
}
|
|
config_file_contents.update(mobly_params_dict)
|
|
logging.debug('Config file contents: %s', config_file_contents)
|
|
tmp_yml_file_name = run_all_tests.get_updated_yml_file(config_file_contents)
|
|
logging.info('Using %s as temporary config yml file', tmp_yml_file_name)
|
|
|
|
# Run tests
|
|
logging.info('%d runs for test_sensor_fusion with camera %s',
|
|
num_runs, camera_id)
|
|
logging.info('FPS: %d, img size: %s', fps, img_size)
|
|
for _ in range(num_runs):
|
|
cmd = ['python',
|
|
os.path.join(os.environ['CAMERA_ITS_TOP'], 'tests',
|
|
'sensor_fusion', 'test_sensor_fusion.py'),
|
|
'-c',
|
|
f'{tmp_yml_file_name}'
|
|
]
|
|
# pylint: disable=subprocess-run-check
|
|
with open(run_all_tests.MOBLY_TEST_SUMMARY_TXT_FILE, 'w') as fp:
|
|
output = subprocess.run(cmd, stdout=fp)
|
|
# pylint: enable=subprocess-run-check
|
|
|
|
with open(run_all_tests.MOBLY_TEST_SUMMARY_TXT_FILE, 'r') as _:
|
|
if output.returncode == 0:
|
|
return_string = 'PASS'
|
|
else:
|
|
return_string = 'FAIL'
|
|
|
|
os.remove(run_all_tests.MOBLY_TEST_SUMMARY_TXT_FILE)
|
|
file_name = os.path.join(
|
|
mobly_scene_output_logs_path, _TEST_BED_SENSOR_FUSION, 'latest',
|
|
'test_log.DEBUG')
|
|
time_shift = find_time_shift(file_name)
|
|
logging.info('%s time_shift: %.4f ms, corr: %.6f', return_string,
|
|
time_shift['time_shift'], time_shift['corr'])
|
|
if time_shift['corr'] < _CORR_DIST_THRESH_MAX:
|
|
time_shifts.append(time_shift)
|
|
else:
|
|
logging.info('Correlation distance too large. Not used for stats.')
|
|
|
|
# Summarize results with stats
|
|
times = [t['time_shift'] for t in time_shifts]
|
|
logging.info('runs: %d, time_shift mean: %.4f, sigma: %.4f',
|
|
len(times), np.mean(times), np.std(times))
|
|
|
|
# Delete temporary yml file after run.
|
|
tmp_yml_file = os.path.join(run_all_tests.YAML_FILE_DIR, tmp_yml_file_name)
|
|
os.remove(tmp_yml_file)
|
|
|
|
# Delete temporary image files after run.
|
|
if debug == 'False':
|
|
logging.info('Removing tmp dir %s to save space.', topdir)
|
|
shutil.rmtree(topdir)
|
|
|
|
logging.info('Test completed.')
|
|
if __name__ == '__main__':
|
|
main()
|
|
|