You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

185 lines
6.1 KiB

#!/usr/bin/env python3
#
# Copyright 2019, The Android Open Source Project
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""Helper util libraries for command line operations."""
import asyncio
import sys
import time
from typing import Tuple, Optional, List
import lib.print_utils as print_utils
TIMEOUT = 50
SIMULATE = False
def run_command_nofail(cmd: List[str], **kwargs) -> None:
"""Runs cmd list with default timeout.
Throws exception if the execution fails.
"""
my_kwargs = {"timeout": TIMEOUT, "shell": False, "simulate": False}
my_kwargs.update(kwargs)
passed, out = execute_arbitrary_command(cmd, **my_kwargs)
if not passed:
raise RuntimeError(
"Failed to execute %s (kwargs=%s), output=%s" % (cmd, kwargs, out))
def run_adb_shell_command(cmd: str) -> Tuple[bool, str]:
"""Runs command using adb shell.
Returns:
A tuple of running status (True=succeeded, False=failed or timed out) and
std output (string contents of stdout with trailing whitespace removed).
"""
return run_shell_command('adb shell "{}"'.format(cmd))
def run_shell_func(script_path: str,
func: str,
args: List[str]) -> Tuple[bool, str]:
"""Runs shell function with default timeout.
Returns:
A tuple of running status (True=succeeded, False=failed or timed out) and
std output (string contents of stdout with trailing whitespace removed) .
"""
if args:
cmd = 'bash -c "source {script_path}; {func} {args}"'.format(
script_path=script_path,
func=func,
args=' '.join("'{}'".format(arg) for arg in args))
else:
cmd = 'bash -c "source {script_path}; {func}"'.format(
script_path=script_path,
func=func)
print_utils.debug_print(cmd)
return run_shell_command(cmd)
def run_shell_command(cmd: str) -> Tuple[bool, str]:
"""Runs shell command with default timeout.
Returns:
A tuple of running status (True=succeeded, False=failed or timed out) and
std output (string contents of stdout with trailing whitespace removed) .
"""
return execute_arbitrary_command([cmd],
TIMEOUT,
shell=True,
simulate=SIMULATE)
def execute_arbitrary_command(cmd: List[str],
timeout: int,
shell: bool,
simulate: bool) -> Tuple[bool, str]:
"""Run arbitrary shell command with default timeout.
Mostly copy from
frameworks/base/startop/scripts/app_startup/app_startup_runner.py.
Args:
cmd: list of cmd strings.
timeout: the time limit of running cmd.
shell: indicate if the cmd is a shell command.
simulate: if it's true, do not run the command and assume the running is
successful.
Returns:
A tuple of running status (True=succeeded, False=failed or timed out) and
std output (string contents of stdout with trailing whitespace removed) .
"""
if simulate:
print(cmd)
return True, ''
print_utils.debug_print('[EXECUTE]', cmd)
# block until either command finishes or the timeout occurs.
loop = asyncio.get_event_loop()
(return_code, script_output) = loop.run_until_complete(
_run_command(*cmd, shell=shell, timeout=timeout))
script_output = script_output.decode() # convert bytes to str
passed = (return_code == 0)
print_utils.debug_print('[$?]', return_code)
if not passed:
print('[FAILED, code:%s]' % (return_code), script_output, file=sys.stderr)
return passed, script_output.rstrip()
async def _run_command(*args: List[str],
shell: bool = False,
timeout: Optional[int] = None) -> Tuple[int, bytes]:
if shell:
process = await asyncio.create_subprocess_shell(
*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
else:
process = await asyncio.create_subprocess_exec(
*args, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.STDOUT)
script_output = b''
print_utils.debug_print('[PID]', process.pid)
timeout_remaining = timeout
time_started = time.time()
# read line (sequence of bytes ending with b'\n') asynchronously
while True:
try:
line = await asyncio.wait_for(process.stdout.readline(),
timeout_remaining)
print_utils.debug_print('[STDOUT]', line)
script_output += line
if timeout_remaining:
time_elapsed = time.time() - time_started
timeout_remaining = timeout - time_elapsed
except asyncio.TimeoutError:
print_utils.debug_print('[TIMEDOUT] Process ', process.pid)
print_utils.debug_print('[TIMEDOUT] Sending SIGTERM.')
process.terminate()
# 5 second timeout for process to handle SIGTERM nicely.
try:
(remaining_stdout,
remaining_stderr) = await asyncio.wait_for(process.communicate(), 5)
script_output += remaining_stdout
except asyncio.TimeoutError:
print_utils.debug_print('[TIMEDOUT] Sending SIGKILL.')
process.kill()
# 5 second timeout to finish with SIGKILL.
try:
(remaining_stdout,
remaining_stderr) = await asyncio.wait_for(process.communicate(), 5)
script_output += remaining_stdout
except asyncio.TimeoutError:
# give up, this will leave a zombie process.
print_utils.debug_print('[TIMEDOUT] SIGKILL failed for process ',
process.pid)
time.sleep(100)
return -1, script_output
else:
if not line: # EOF
break
code = await process.wait() # wait for child process to exit
return code, script_output