#!/usr/bin/env python3 # Copyright (C) 2019 The Android Open Source Project # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import argparse import httplib2 import logging import mimetypes import mmap import os import subprocess import signal import sys import threading import time from common_utils import init_logging from config import GCS_ARTIFACTS from multiprocessing.pool import ThreadPool from oauth2client.client import GoogleCredentials CUR_DIR = os.path.dirname(__file__) RESCAN_PERIOD_SEC = 5 # Scan for new artifact directories every X seconds. WATCHDOG_SEC = 60 * 6 # Self kill after 5 minutes tls = threading.local() '''Polls for new directories under ARTIFACTS_DIR and uploads them to GCS''' def get_http_obj(): http = getattr(tls, 'http', None) if http is not None: return http tls.http = httplib2.Http() scopes = ['https://www.googleapis.com/auth/cloud-platform'] creds = GoogleCredentials.get_application_default().create_scoped(scopes) creds.authorize(tls.http) return tls.http def upload_one_file(fpath): http = get_http_obj() relpath = os.path.relpath(fpath, os.getenv('ARTIFACTS_DIR')) logging.debug('Uploading %s', relpath) assert (os.path.exists(fpath)) fsize = os.path.getsize(fpath) mime_type = mimetypes.guess_type(fpath)[0] or 'application/octet-stream' mm = '' hdr = {'Content-Length': fsize, 'Content-type': mime_type} if fsize > 0: with open(fpath, 'rb') as f: mm = mmap.mmap(f.fileno(), fsize, access=mmap.ACCESS_READ) uri = 'https://%s.storage.googleapis.com/%s' % (GCS_ARTIFACTS, relpath) resp, res = http.request(uri, method='PUT', headers=hdr, body=mm) if fsize > 0: mm.close() if resp.status != 200: logging.error('HTTP request failed with code %d : %s', resp.status, res) return -1 return fsize def upload_one_file_with_retries(fpath): for retry in [0.5, 1.5, 3]: res = upload_one_file(fpath) if res >= 0: return res logging.warning('Upload of %s failed, retrying in %s seconds', fpath, retry) time.sleep(retry) def list_files(path): for root, _, files in os.walk(path): for fname in files: fpath = os.path.join(root, fname) if os.path.isfile(fpath): yield fpath def scan_and_upload_perf_folder(job_id, dirpath): perf_folder = os.path.join(dirpath, 'perf') if not os.path.isdir(perf_folder): return uploader = os.path.join(CUR_DIR, 'perf_metrics_uploader.py') for path in list_files(perf_folder): subprocess.call([uploader, '--job-id', job_id, path]) def main(): init_logging() signal.alarm(WATCHDOG_SEC) mimetypes.add_type('application/wasm', '.wasm') parser = argparse.ArgumentParser() parser.add_argument('--rm', action='store_true', help='Removes the directory') parser.add_argument( '--job-id', type=str, required=True, help='The Perfetto CI job ID to tie this upload to') args = parser.parse_args() job_id = args.job_id dirpath = os.path.join(os.getenv('ARTIFACTS_DIR', default=os.curdir), job_id) if not os.path.isdir(dirpath): logging.error('Directory not found: %s', dirpath) return 1 # Make all artifacts readable by our user. Some of them are extracted as # rw-rw--- and owned by a diffrent user (whatever the "sandbox" docker # container uid ends up mapping to). subprocess.call(['sudo', 'chown', '-R', str(os.geteuid()), dirpath]) total_size = 0 uploads = 0 failures = 0 files = list_files(dirpath) pool = ThreadPool(processes=10) for upl_size in pool.imap_unordered(upload_one_file_with_retries, files): uploads += 1 if upl_size >= 0 else 0 failures += 1 if upl_size < 0 else 0 total_size += max(upl_size, 0) logging.info('Uploaded artifacts for %s: %d files, %s failures, %d KB', job_id, uploads, failures, total_size / 1e3) scan_and_upload_perf_folder(job_id, dirpath) if args.rm: subprocess.call(['sudo', 'rm', '-rf', dirpath]) return 0 if __name__ == '__main__': sys.exit(main())