You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
167 lines
5.3 KiB
167 lines
5.3 KiB
import csv
|
|
import django.http
|
|
import common
|
|
from autotest_lib.frontend.afe import rpc_utils
|
|
|
|
class CsvEncoder(object):
|
|
def __init__(self, request, response):
|
|
self._request = request
|
|
self._response = response
|
|
self._output_rows = []
|
|
|
|
|
|
def _append_output_row(self, row):
|
|
self._output_rows.append(row)
|
|
|
|
|
|
def _build_response(self):
|
|
response = django.http.HttpResponse(mimetype='text/csv')
|
|
response['Content-Disposition'] = (
|
|
'attachment; filename=tko_query.csv')
|
|
writer = csv.writer(response)
|
|
writer.writerows(self._output_rows)
|
|
return response
|
|
|
|
|
|
def encode(self):
|
|
raise NotImplementedError
|
|
|
|
|
|
class UnhandledMethodEncoder(CsvEncoder):
|
|
def encode(self):
|
|
return rpc_utils.raw_http_response(
|
|
'Unhandled method %s (this indicates a bug)\r\n' %
|
|
self._request['method'])
|
|
|
|
|
|
class SpreadsheetCsvEncoder(CsvEncoder):
|
|
def _total_index(self, group, num_columns):
|
|
row_index, column_index = group['header_indices']
|
|
return row_index * num_columns + column_index
|
|
|
|
|
|
def _group_string(self, group):
|
|
result = '%s / %s' % (group['pass_count'], group['complete_count'])
|
|
if group['incomplete_count'] > 0:
|
|
result += ' (%s incomplete)' % group['incomplete_count']
|
|
if 'extra_info' in group:
|
|
result = '\n'.join([result] + group['extra_info'])
|
|
return result
|
|
|
|
|
|
def _build_value_table(self):
|
|
value_table = [''] * self._num_rows * self._num_columns
|
|
for group in self._response['groups']:
|
|
total_index = self._total_index(group, self._num_columns)
|
|
value_table[total_index] = self._group_string(group)
|
|
return value_table
|
|
|
|
|
|
def _header_string(self, header_value):
|
|
return '/'.join(header_value)
|
|
|
|
|
|
def _process_value_table(self, value_table, row_headers):
|
|
total_index = 0
|
|
for row_index in xrange(self._num_rows):
|
|
row_header = self._header_string(row_headers[row_index])
|
|
row_end_index = total_index + self._num_columns
|
|
row_values = value_table[total_index:row_end_index]
|
|
self._append_output_row([row_header] + row_values)
|
|
total_index += self._num_columns
|
|
|
|
|
|
def encode(self):
|
|
header_values = self._response['header_values']
|
|
assert len(header_values) == 2
|
|
row_headers, column_headers = header_values
|
|
self._num_rows, self._num_columns = (len(row_headers),
|
|
len(column_headers))
|
|
|
|
value_table = self._build_value_table()
|
|
|
|
first_line = [''] + [self._header_string(header_value)
|
|
for header_value in column_headers]
|
|
self._append_output_row(first_line)
|
|
self._process_value_table(value_table, row_headers)
|
|
|
|
return self._build_response()
|
|
|
|
|
|
class TableCsvEncoder(CsvEncoder):
|
|
def __init__(self, request, response):
|
|
super(TableCsvEncoder, self).__init__(request, response)
|
|
self._column_specs = request['columns']
|
|
|
|
|
|
def _format_row(self, row_object):
|
|
"""Extract data from a row object into a list of strings"""
|
|
return [row_object.get(field) for field, name in self._column_specs]
|
|
|
|
|
|
def _encode_table(self, row_objects):
|
|
self._append_output_row([column_spec[1] # header row
|
|
for column_spec in self._column_specs])
|
|
for row_object in row_objects:
|
|
self._append_output_row(self._format_row(row_object))
|
|
return self._build_response()
|
|
|
|
|
|
def encode(self):
|
|
return self._encode_table(self._response)
|
|
|
|
|
|
class GroupedTableCsvEncoder(TableCsvEncoder):
|
|
def encode(self):
|
|
return self._encode_table(self._response['groups'])
|
|
|
|
|
|
class StatusCountTableCsvEncoder(GroupedTableCsvEncoder):
|
|
_PASS_RATE_FIELD = '_test_pass_rate'
|
|
|
|
def __init__(self, request, response):
|
|
super(StatusCountTableCsvEncoder, self).__init__(request, response)
|
|
# inject a more sensible field name for test pass rate
|
|
for column_spec in self._column_specs:
|
|
field, name = column_spec
|
|
if name == 'Test pass rate':
|
|
column_spec[0] = self._PASS_RATE_FIELD
|
|
break
|
|
|
|
|
|
def _format_pass_rate(self, row_object):
|
|
result = '%s / %s' % (row_object['pass_count'],
|
|
row_object['complete_count'])
|
|
incomplete_count = row_object['incomplete_count']
|
|
if incomplete_count:
|
|
result += ' (%s incomplete)' % incomplete_count
|
|
return result
|
|
|
|
|
|
def _format_row(self, row_object):
|
|
row_object[self._PASS_RATE_FIELD] = self._format_pass_rate(row_object)
|
|
return super(StatusCountTableCsvEncoder, self)._format_row(row_object)
|
|
|
|
|
|
_ENCODER_MAP = {
|
|
'get_latest_tests' : SpreadsheetCsvEncoder,
|
|
'get_test_views' : TableCsvEncoder,
|
|
'get_group_counts' : GroupedTableCsvEncoder,
|
|
}
|
|
|
|
|
|
def _get_encoder_class(request):
|
|
method = request['method']
|
|
if method in _ENCODER_MAP:
|
|
return _ENCODER_MAP[method]
|
|
if method == 'get_status_counts':
|
|
if 'columns' in request:
|
|
return StatusCountTableCsvEncoder
|
|
return SpreadsheetCsvEncoder
|
|
return UnhandledMethodEncoder
|
|
|
|
|
|
def encoder(request, response):
|
|
EncoderClass = _get_encoder_class(request)
|
|
return EncoderClass(request, response)
|