You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
508 lines
19 KiB
508 lines
19 KiB
#!/usr/bin/python2
|
|
|
|
# Copyright 2016 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
|
|
"""Command line tool to analyze wave file and detect artifacts."""
|
|
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import print_function
|
|
import argparse
|
|
import collections
|
|
import json
|
|
import logging
|
|
import numpy
|
|
import pprint
|
|
import subprocess
|
|
import tempfile
|
|
import wave
|
|
from six.moves import range
|
|
|
|
# Normal autotest environment.
|
|
try:
|
|
import common
|
|
from autotest_lib.client.cros.audio import audio_analysis
|
|
from autotest_lib.client.cros.audio import audio_data
|
|
from autotest_lib.client.cros.audio import audio_quality_measurement
|
|
# Standalone execution without autotest environment.
|
|
except ImportError:
|
|
import audio_analysis
|
|
import audio_data
|
|
import audio_quality_measurement
|
|
|
|
|
|
# Holder for quality parameters used in audio_quality_measurement module.
|
|
QualityParams = collections.namedtuple('QualityParams',
|
|
['block_size_secs',
|
|
'frequency_error_threshold',
|
|
'delay_amplitude_threshold',
|
|
'noise_amplitude_threshold',
|
|
'burst_amplitude_threshold'])
|
|
|
|
|
|
def add_args(parser):
|
|
"""Adds command line arguments."""
|
|
parser.add_argument('filename', metavar='FILE', type=str,
|
|
help='The wav or raw file to check.'
|
|
'The file format is determined by file extension.'
|
|
'For raw format, user must also pass -b, -r, -c'
|
|
'for bit width, rate, and number of channels.')
|
|
parser.add_argument('--debug', action='store_true', default=False,
|
|
help='Show debug message.')
|
|
parser.add_argument('--spectral-only', action='store_true', default=False,
|
|
help='Only do spectral analysis on each channel.')
|
|
parser.add_argument('--freqs', metavar='FREQ', type=float,
|
|
nargs='*',
|
|
help='Expected frequencies in the channels. '
|
|
'Frequencies are separated by space. '
|
|
'E.g.: --freqs 1000 2000. '
|
|
'It means only the first two '
|
|
'channels (1000Hz, 2000Hz) are to be checked. '
|
|
'Unwanted channels can be specified by 0. '
|
|
'E.g.: --freqs 1000 0 2000 0 3000. '
|
|
'It means only channe 0,2,4 are to be examined.')
|
|
parser.add_argument('--freq-threshold', metavar='FREQ_THRESHOLD', type=float,
|
|
default=5,
|
|
help='Frequency difference threshold in Hz. '
|
|
'Default is 5Hz')
|
|
parser.add_argument('--ignore-high-freq', metavar='HIGH_FREQ_THRESHOLD',
|
|
type=float, default=5000,
|
|
help='Frequency threshold in Hz to be ignored for '
|
|
'high frequency. Default is 5KHz')
|
|
parser.add_argument('--output-file', metavar='OUTPUT_FILE', type=str,
|
|
help='Output file to dump analysis result in JSON format')
|
|
parser.add_argument('-b', '--bit-width', metavar='BIT_WIDTH', type=int,
|
|
default=32,
|
|
help='For raw file. Bit width of a sample. '
|
|
'Assume sample format is little-endian signed int. '
|
|
'Default is 32')
|
|
parser.add_argument('-r', '--rate', metavar='RATE', type=int,
|
|
default=48000,
|
|
help='For raw file. Sampling rate. Default is 48000')
|
|
parser.add_argument('-c', '--channel', metavar='CHANNEL', type=int,
|
|
default=8,
|
|
help='For raw file. Number of channels. '
|
|
'Default is 8.')
|
|
|
|
# Arguments for quality measurement customization.
|
|
parser.add_argument(
|
|
'--quality-block-size-secs',
|
|
metavar='BLOCK_SIZE_SECS', type=float,
|
|
default=audio_quality_measurement.DEFAULT_BLOCK_SIZE_SECS,
|
|
help='Block size for quality measurement. '
|
|
'Refer to audio_quality_measurement module for detail.')
|
|
parser.add_argument(
|
|
'--quality-frequency-error-threshold',
|
|
metavar='FREQ_ERR_THRESHOLD', type=float,
|
|
default=audio_quality_measurement.DEFAULT_FREQUENCY_ERROR,
|
|
help='Frequency error threshold for identifying sine wave'
|
|
'in quality measurement. '
|
|
'Refer to audio_quality_measurement module for detail.')
|
|
parser.add_argument(
|
|
'--quality-delay-amplitude-threshold',
|
|
metavar='DELAY_AMPLITUDE_THRESHOLD', type=float,
|
|
default=audio_quality_measurement.DEFAULT_DELAY_AMPLITUDE_THRESHOLD,
|
|
help='Amplitude ratio threshold for identifying delay in sine wave'
|
|
'in quality measurement. '
|
|
'Refer to audio_quality_measurement module for detail.')
|
|
parser.add_argument(
|
|
'--quality-noise-amplitude-threshold',
|
|
metavar='NOISE_AMPLITUDE_THRESHOLD', type=float,
|
|
default=audio_quality_measurement.DEFAULT_NOISE_AMPLITUDE_THRESHOLD,
|
|
help='Amplitude ratio threshold for identifying noise in sine wave'
|
|
'in quality measurement. '
|
|
'Refer to audio_quality_measurement module for detail.')
|
|
parser.add_argument(
|
|
'--quality-burst-amplitude-threshold',
|
|
metavar='BURST_AMPLITUDE_THRESHOLD', type=float,
|
|
default=audio_quality_measurement.DEFAULT_BURST_AMPLITUDE_THRESHOLD,
|
|
help='Amplitude ratio threshold for identifying burst in sine wave'
|
|
'in quality measurement. '
|
|
'Refer to audio_quality_measurement module for detail.')
|
|
|
|
|
|
def parse_args(parser):
|
|
"""Parses args."""
|
|
args = parser.parse_args()
|
|
return args
|
|
|
|
|
|
class WaveFileException(Exception):
|
|
"""Error in WaveFile."""
|
|
pass
|
|
|
|
|
|
class WaveFormatExtensibleException(Exception):
|
|
"""Wave file is in WAVE_FORMAT_EXTENSIBLE format which is not supported."""
|
|
pass
|
|
|
|
|
|
class WaveFile(object):
|
|
"""Class which handles wave file reading.
|
|
|
|
Properties:
|
|
raw_data: audio_data.AudioRawData object for data in wave file.
|
|
rate: sampling rate.
|
|
|
|
"""
|
|
def __init__(self, filename):
|
|
"""Inits a wave file.
|
|
|
|
@param filename: file name of the wave file.
|
|
|
|
"""
|
|
self.raw_data = None
|
|
self.rate = None
|
|
|
|
self._wave_reader = None
|
|
self._n_channels = None
|
|
self._sample_width_bits = None
|
|
self._n_frames = None
|
|
self._binary = None
|
|
|
|
try:
|
|
self._read_wave_file(filename)
|
|
except WaveFormatExtensibleException:
|
|
logging.warning(
|
|
'WAVE_FORMAT_EXTENSIBLE is not supproted. '
|
|
'Try command "sox in.wav -t wavpcm out.wav" to convert '
|
|
'the file to WAVE_FORMAT_PCM format.')
|
|
self._convert_and_read_wav_file(filename)
|
|
|
|
|
|
def _convert_and_read_wav_file(self, filename):
|
|
"""Converts the wav file and read it.
|
|
|
|
Converts the file into WAVE_FORMAT_PCM format using sox command and
|
|
reads its content.
|
|
|
|
@param filename: The wave file to be read.
|
|
|
|
@raises: RuntimeError: sox is not installed.
|
|
|
|
"""
|
|
# Checks if sox is installed.
|
|
try:
|
|
subprocess.check_output(['sox', '--version'])
|
|
except:
|
|
raise RuntimeError('sox command is not installed. '
|
|
'Try sudo apt-get install sox')
|
|
|
|
with tempfile.NamedTemporaryFile(suffix='.wav') as converted_file:
|
|
command = ['sox', filename, '-t', 'wavpcm', converted_file.name]
|
|
logging.debug('Convert the file using sox: %s', command)
|
|
subprocess.check_call(command)
|
|
self._read_wave_file(converted_file.name)
|
|
|
|
|
|
def _read_wave_file(self, filename):
|
|
"""Reads wave file header and samples.
|
|
|
|
@param filename: The wave file to be read.
|
|
|
|
@raises WaveFormatExtensibleException: Wave file is in
|
|
WAVE_FORMAT_EXTENSIBLE format.
|
|
@raises WaveFileException: Wave file format is not supported.
|
|
|
|
"""
|
|
try:
|
|
self._wave_reader = wave.open(filename, 'r')
|
|
self._read_wave_header()
|
|
self._read_wave_binary()
|
|
except wave.Error as e:
|
|
if 'unknown format: 65534' in str(e):
|
|
raise WaveFormatExtensibleException()
|
|
else:
|
|
logging.exception('Unsupported wave format')
|
|
raise WaveFileException()
|
|
finally:
|
|
if self._wave_reader:
|
|
self._wave_reader.close()
|
|
|
|
|
|
def _read_wave_header(self):
|
|
"""Reads wave file header.
|
|
|
|
@raises WaveFileException: wave file is compressed.
|
|
|
|
"""
|
|
# Header is a tuple of
|
|
# (nchannels, sampwidth, framerate, nframes, comptype, compname).
|
|
header = self._wave_reader.getparams()
|
|
logging.debug('Wave header: %s', header)
|
|
|
|
self._n_channels = header[0]
|
|
self._sample_width_bits = header[1] * 8
|
|
self.rate = header[2]
|
|
self._n_frames = header[3]
|
|
comptype = header[4]
|
|
compname = header[5]
|
|
|
|
if comptype != 'NONE' or compname != 'not compressed':
|
|
raise WaveFileException('Can not support compressed wav file.')
|
|
|
|
|
|
def _read_wave_binary(self):
|
|
"""Reads in samples in wave file."""
|
|
self._binary = self._wave_reader.readframes(self._n_frames)
|
|
format_str = 'S%d_LE' % self._sample_width_bits
|
|
self.raw_data = audio_data.AudioRawData(
|
|
binary=self._binary,
|
|
channel=self._n_channels,
|
|
sample_format=format_str)
|
|
|
|
|
|
def get_number_frames(self):
|
|
"""Get the number of frames in the wave file."""
|
|
return self._n_frames
|
|
|
|
|
|
class QualityCheckerError(Exception):
|
|
"""Error in QualityChecker."""
|
|
pass
|
|
|
|
|
|
class CompareFailure(QualityCheckerError):
|
|
"""Exception when frequency comparison fails."""
|
|
pass
|
|
|
|
|
|
class QualityFailure(QualityCheckerError):
|
|
"""Exception when quality check fails."""
|
|
pass
|
|
|
|
|
|
class QualityChecker(object):
|
|
"""Quality checker controls the flow of checking quality of raw data."""
|
|
def __init__(self, raw_data, rate):
|
|
"""Inits a quality checker.
|
|
|
|
@param raw_data: An audio_data.AudioRawData object.
|
|
@param rate: Sampling rate.
|
|
|
|
"""
|
|
self._raw_data = raw_data
|
|
self._rate = rate
|
|
self._spectrals = []
|
|
self._quality_result = []
|
|
|
|
|
|
def do_spectral_analysis(self, ignore_high_freq, check_quality,
|
|
quality_params):
|
|
"""Gets the spectral_analysis result.
|
|
|
|
@param ignore_high_freq: Ignore high frequencies above this threshold.
|
|
@param check_quality: Check quality of each channel.
|
|
@param quality_params: A QualityParams object for quality measurement.
|
|
|
|
"""
|
|
self.has_data()
|
|
for channel_idx in range(self._raw_data.channel):
|
|
signal = self._raw_data.channel_data[channel_idx]
|
|
max_abs = max(numpy.abs(signal))
|
|
logging.debug('Channel %d max abs signal: %f', channel_idx, max_abs)
|
|
if max_abs == 0:
|
|
logging.info('No data on channel %d, skip this channel',
|
|
channel_idx)
|
|
continue
|
|
|
|
saturate_value = audio_data.get_maximum_value_from_sample_format(
|
|
self._raw_data.sample_format)
|
|
normalized_signal = audio_analysis.normalize_signal(
|
|
signal, saturate_value)
|
|
logging.debug('saturate_value: %f', saturate_value)
|
|
logging.debug('max signal after normalized: %f', max(normalized_signal))
|
|
spectral = audio_analysis.spectral_analysis(
|
|
normalized_signal, self._rate)
|
|
|
|
logging.debug('Channel %d spectral:\n%s', channel_idx,
|
|
pprint.pformat(spectral))
|
|
|
|
# Ignore high frequencies above the threshold.
|
|
spectral = [(f, c) for (f, c) in spectral if f < ignore_high_freq]
|
|
|
|
logging.info('Channel %d spectral after ignoring high frequencies '
|
|
'above %f:\n%s', channel_idx, ignore_high_freq,
|
|
pprint.pformat(spectral))
|
|
|
|
if check_quality:
|
|
quality = audio_quality_measurement.quality_measurement(
|
|
signal=normalized_signal,
|
|
rate=self._rate,
|
|
dominant_frequency=spectral[0][0],
|
|
block_size_secs=quality_params.block_size_secs,
|
|
frequency_error_threshold=quality_params.frequency_error_threshold,
|
|
delay_amplitude_threshold=quality_params.delay_amplitude_threshold,
|
|
noise_amplitude_threshold=quality_params.noise_amplitude_threshold,
|
|
burst_amplitude_threshold=quality_params.burst_amplitude_threshold)
|
|
|
|
logging.debug('Channel %d quality:\n%s', channel_idx,
|
|
pprint.pformat(quality))
|
|
self._quality_result.append(quality)
|
|
|
|
self._spectrals.append(spectral)
|
|
|
|
|
|
def has_data(self):
|
|
"""Checks if data has been set.
|
|
|
|
@raises QualityCheckerError: if data or rate is not set yet.
|
|
|
|
"""
|
|
if not self._raw_data or not self._rate:
|
|
raise QualityCheckerError('Data and rate is not set yet')
|
|
|
|
|
|
def check_freqs(self, expected_freqs, freq_threshold):
|
|
"""Checks the dominant frequencies in the channels.
|
|
|
|
@param expected_freq: A list of frequencies. If frequency is 0, it
|
|
means this channel should be ignored.
|
|
@param freq_threshold: The difference threshold to compare two
|
|
frequencies.
|
|
|
|
"""
|
|
logging.debug('expected_freqs: %s', expected_freqs)
|
|
for idx, expected_freq in enumerate(expected_freqs):
|
|
if expected_freq == 0:
|
|
continue
|
|
if not self._spectrals[idx]:
|
|
raise CompareFailure(
|
|
'Failed at channel %d: no dominant frequency' % idx)
|
|
dominant_freq = self._spectrals[idx][0][0]
|
|
if abs(dominant_freq - expected_freq) > freq_threshold:
|
|
raise CompareFailure(
|
|
'Failed at channel %d: %f is too far away from %f' % (
|
|
idx, dominant_freq, expected_freq))
|
|
|
|
|
|
def check_quality(self):
|
|
"""Checks the quality measurement results on each channel.
|
|
|
|
@raises: QualityFailure when there is artifact.
|
|
|
|
"""
|
|
error_msgs = []
|
|
|
|
for idx, quality_res in enumerate(self._quality_result):
|
|
artifacts = quality_res['artifacts']
|
|
if artifacts['noise_before_playback']:
|
|
error_msgs.append(
|
|
'Found noise before playback: %s' % (
|
|
artifacts['noise_before_playback']))
|
|
if artifacts['noise_after_playback']:
|
|
error_msgs.append(
|
|
'Found noise after playback: %s' % (
|
|
artifacts['noise_after_playback']))
|
|
if artifacts['delay_during_playback']:
|
|
error_msgs.append(
|
|
'Found delay during playback: %s' % (
|
|
artifacts['delay_during_playback']))
|
|
if artifacts['burst_during_playback']:
|
|
error_msgs.append(
|
|
'Found burst during playback: %s' % (
|
|
artifacts['burst_during_playback']))
|
|
if error_msgs:
|
|
raise QualityFailure('Found bad quality: %s', '\n'.join(error_msgs))
|
|
|
|
|
|
def dump(self, output_file):
|
|
"""Dumps the result into a file in json format.
|
|
|
|
@param output_file: A file path to dump spectral and quality
|
|
measurement result of each channel.
|
|
|
|
"""
|
|
dump_dict = {
|
|
'spectrals': self._spectrals,
|
|
'quality_result': self._quality_result
|
|
}
|
|
with open(output_file, 'w') as f:
|
|
json.dump(dump_dict, f)
|
|
|
|
|
|
class CheckQualityError(Exception):
|
|
"""Error in check_quality main function."""
|
|
pass
|
|
|
|
|
|
def read_audio_file(args):
|
|
"""Reads audio file.
|
|
|
|
@param args: The namespace parsed from command line arguments.
|
|
|
|
@returns: A tuple (raw_data, rate) where raw_data is
|
|
audio_data.AudioRawData, rate is sampling rate.
|
|
|
|
"""
|
|
if args.filename.endswith('.wav'):
|
|
wavefile = WaveFile(args.filename)
|
|
raw_data = wavefile.raw_data
|
|
rate = wavefile.rate
|
|
elif args.filename.endswith('.raw'):
|
|
binary = None
|
|
with open(args.filename, 'r') as f:
|
|
binary = f.read()
|
|
|
|
raw_data = audio_data.AudioRawData(
|
|
binary=binary,
|
|
channel=args.channel,
|
|
sample_format='S%d_LE' % args.bit_width)
|
|
rate = args.rate
|
|
else:
|
|
raise CheckQualityError(
|
|
'File format for %s is not supported' % args.filename)
|
|
|
|
return raw_data, rate
|
|
|
|
|
|
def get_quality_params(args):
|
|
"""Gets quality parameters in arguments.
|
|
|
|
@param args: The namespace parsed from command line arguments.
|
|
|
|
@returns: A QualityParams object.
|
|
|
|
"""
|
|
quality_params = QualityParams(
|
|
block_size_secs=args.quality_block_size_secs,
|
|
frequency_error_threshold=args.quality_frequency_error_threshold,
|
|
delay_amplitude_threshold=args.quality_delay_amplitude_threshold,
|
|
noise_amplitude_threshold=args.quality_noise_amplitude_threshold,
|
|
burst_amplitude_threshold=args.quality_burst_amplitude_threshold)
|
|
|
|
return quality_params
|
|
|
|
|
|
if __name__ == "__main__":
|
|
parser = argparse.ArgumentParser(
|
|
description='Check signal quality of a wave file. Each channel should'
|
|
' either be all zeros, or sine wave of a fixed frequency.')
|
|
add_args(parser)
|
|
args = parse_args(parser)
|
|
|
|
level = logging.DEBUG if args.debug else logging.INFO
|
|
format = '%(asctime)-15s:%(levelname)s:%(pathname)s:%(lineno)d: %(message)s'
|
|
logging.basicConfig(format=format, level=level)
|
|
|
|
raw_data, rate = read_audio_file(args)
|
|
|
|
checker = QualityChecker(raw_data, rate)
|
|
|
|
quality_params = get_quality_params(args)
|
|
|
|
checker.do_spectral_analysis(ignore_high_freq=args.ignore_high_freq,
|
|
check_quality=(not args.spectral_only),
|
|
quality_params=quality_params)
|
|
|
|
if args.output_file:
|
|
checker.dump(args.output_file)
|
|
|
|
if args.freqs:
|
|
checker.check_freqs(args.freqs, args.freq_threshold)
|
|
|
|
if not args.spectral_only:
|
|
checker.check_quality()
|