You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

267 lines
11 KiB

# Lint as: python2, python3
# Copyright (c) 2016 The Chromium Authors. All rights reserved.
# Use of this source code is governed by a BSD-style license that can be
# found in the LICENSE file.
"""Server side audio utilities functions for Brillo."""
from __future__ import absolute_import
from __future__ import division
from __future__ import print_function
import contextlib
import logging
import numpy
import os
import struct
import subprocess
import tempfile
import wave
from autotest_lib.client.common_lib import error
from six.moves import map
from six.moves import range
_BITS_PER_BYTE=8
# Thresholds used when comparing files.
#
# The frequency threshold used when comparing files. The frequency of the
# recorded audio has to be within _FREQUENCY_THRESHOLD percent of the frequency
# of the original audio.
_FREQUENCY_THRESHOLD = 0.01
# Noise threshold controls how much noise is allowed as a fraction of the
# magnitude of the peak frequency after taking an FFT. The power of all the
# other frequencies in the signal should be within _FFT_NOISE_THRESHOLD percent
# of the power of the main frequency.
_FFT_NOISE_THRESHOLD = 0.05
# Command used to encode audio. If you want to test with something different,
# this should be changed.
_ENCODING_CMD = 'sox'
def extract_wav_frames(wave_file):
"""Extract all frames from a WAV file.
@param wave_file: A Wave_read object representing a WAV file opened for
reading.
@return: A list containing the frames in the WAV file.
"""
num_frames = wave_file.getnframes()
sample_width = wave_file.getsampwidth()
if sample_width == 1:
fmt = '%iB' # Read 1 byte.
elif sample_width == 2:
fmt = '%ih' # Read 2 bytes.
elif sample_width == 4:
fmt = '%ii' # Read 4 bytes.
else:
raise ValueError('Unsupported sample width')
frames = list(struct.unpack(fmt % num_frames * wave_file.getnchannels(),
wave_file.readframes(num_frames)))
# Since 8-bit PCM is unsigned with an offset of 128, we subtract the offset
# to make it signed since the rest of the code assumes signed numbers.
if sample_width == 1:
frames = [val - 128 for val in frames]
return frames
def check_wav_file(filename, num_channels=None, sample_rate=None,
sample_width=None):
"""Checks a WAV file and returns its peak PCM values.
@param filename: Input WAV file to analyze.
@param num_channels: Number of channels to expect (None to not check).
@param sample_rate: Sample rate to expect (None to not check).
@param sample_width: Sample width to expect (None to not check).
@return A list of the absolute maximum PCM values for each channel in the
WAV file.
@raise ValueError: Failed to process the WAV file or validate an attribute.
"""
chk_file = None
try:
chk_file = wave.open(filename, 'r')
if num_channels is not None and chk_file.getnchannels() != num_channels:
raise ValueError('Expected %d channels but got %d instead.',
num_channels, chk_file.getnchannels())
if sample_rate is not None and chk_file.getframerate() != sample_rate:
raise ValueError('Expected sample rate %d but got %d instead.',
sample_rate, chk_file.getframerate())
if sample_width is not None and chk_file.getsampwidth() != sample_width:
raise ValueError('Expected sample width %d but got %d instead.',
sample_width, chk_file.getsampwidth())
frames = extract_wav_frames(chk_file)
except wave.Error as e:
raise ValueError('Error processing WAV file: %s' % e)
finally:
if chk_file is not None:
chk_file.close()
peaks = []
for i in range(chk_file.getnchannels()):
peaks.append(max(list(map(abs, frames[i::chk_file.getnchannels()]))))
return peaks;
def generate_sine_file(host, num_channels, sample_rate, sample_width,
duration_secs, sine_frequency, temp_dir,
file_format='wav'):
"""Generate a sine file and push it to the DUT.
@param host: An object representing the DUT.
@param num_channels: Number of channels to use.
@param sample_rate: Sample rate to use for sine wave generation.
@param sample_width: Sample width to use for sine wave generation.
@param duration_secs: Duration in seconds to generate sine wave for.
@param sine_frequency: Frequency to generate sine wave at.
@param temp_dir: A temporary directory on the host.
@param file_format: A string representing the encoding for the audio file.
@return A tuple of the filename on the server and the DUT.
""";
_, local_filename = tempfile.mkstemp(
prefix='sine-', suffix='.' + file_format, dir=temp_dir)
if sample_width == 1:
byte_format = '-e unsigned'
else:
byte_format = '-e signed'
gen_file_cmd = ('sox -n -t wav -c %d %s -b %d -r %d %s synth %d sine %d '
'vol 0.9' % (num_channels, byte_format,
sample_width * _BITS_PER_BYTE, sample_rate,
local_filename, duration_secs, sine_frequency))
logging.info('Command to generate sine wave: %s', gen_file_cmd)
subprocess.call(gen_file_cmd, shell=True)
if file_format != 'wav':
# Convert the file to the appropriate format.
logging.info('Converting file to %s', file_format)
_, local_encoded_filename = tempfile.mkstemp(
prefix='sine-', suffix='.' + file_format, dir=temp_dir)
cvt_file_cmd = '%s %s %s' % (_ENCODING_CMD, local_filename,
local_encoded_filename)
logging.info('Command to convert file: %s', cvt_file_cmd)
subprocess.call(cvt_file_cmd, shell=True)
else:
local_encoded_filename = local_filename
dut_tmp_dir = '/data'
remote_filename = os.path.join(dut_tmp_dir, 'sine.' + file_format)
logging.info('Send file to DUT.')
# TODO(ralphnathan): Find a better place to put this file once the SELinux
# issues are resolved.
logging.info('remote_filename %s', remote_filename)
host.send_file(local_encoded_filename, remote_filename)
return local_filename, remote_filename
def _is_outside_frequency_threshold(freq_reference, freq_rec):
"""Compares the frequency of the recorded audio with the reference audio.
This function checks to see if the frequencies corresponding to the peak
FFT values are similiar meaning that the dominant frequency in the audio
signal is the same for the recorded audio as that in the audio played.
@param req_reference: The dominant frequency in the reference audio file.
@param freq_rec: The dominant frequency in the recorded audio file.
@return: True is freq_rec is with _FREQUENCY_THRESHOLD percent of
freq_reference.
"""
ratio = float(freq_rec) / freq_reference
if ratio > 1 + _FREQUENCY_THRESHOLD or ratio < 1 - _FREQUENCY_THRESHOLD:
return True
return False
def _compare_frames(reference_file_frames, rec_file_frames, num_channels,
sample_rate):
"""Compares audio frames from the reference file and the recorded file.
This method checks for two things:
1. That the main frequency is the same in both the files. This is done
using the FFT and observing the frequency corresponding to the
peak.
2. That there is no other dominant frequency in the recorded file.
This is done by sweeping the frequency domain and checking that the
frequency is always less than _FFT_NOISE_THRESHOLD percentage of
the peak.
The key assumption here is that the reference audio file contains only
one frequency.
@param reference_file_frames: Audio frames from the reference file.
@param rec_file_frames: Audio frames from the recorded file.
@param num_channels: Number of channels in the files.
@param sample_rate: Sample rate of the files.
@raise error.TestFail: The frequency of the recorded signal doesn't
match that of the reference signal.
@raise error.TestFail: There is too much noise in the recorded signal.
"""
for channel in range(num_channels):
reference_data = reference_file_frames[channel::num_channels]
rec_data = rec_file_frames[channel::num_channels]
# Get fft and frequencies corresponding to the fft values.
fft_reference = numpy.fft.rfft(reference_data)
fft_rec = numpy.fft.rfft(rec_data)
fft_freqs_reference = numpy.fft.rfftfreq(len(reference_data),
1.0 / sample_rate)
fft_freqs_rec = numpy.fft.rfftfreq(len(rec_data), 1.0 / sample_rate)
# Get frequency at highest peak.
freq_reference = fft_freqs_reference[
numpy.argmax(numpy.abs(fft_reference))]
abs_fft_rec = numpy.abs(fft_rec)
freq_rec = fft_freqs_rec[numpy.argmax(abs_fft_rec)]
# Compare the two frequencies.
logging.info('Golden frequency of channel %i is %f', channel,
freq_reference)
logging.info('Recorded frequency of channel %i is %f', channel,
freq_rec)
if _is_outside_frequency_threshold(freq_reference, freq_rec):
raise error.TestFail('The recorded audio frequency does not match '
'that of the audio played.')
# Check for noise in the frequency domain.
fft_rec_peak_val = numpy.max(abs_fft_rec)
noise_detected = False
for fft_index, fft_val in enumerate(abs_fft_rec):
if _is_outside_frequency_threshold(freq_reference, freq_rec):
# If the frequency exceeds _FFT_NOISE_THRESHOLD, then fail.
if fft_val > _FFT_NOISE_THRESHOLD * fft_rec_peak_val:
logging.warning('Unexpected frequency peak detected at %f '
'Hz.', fft_freqs_rec[fft_index])
noise_detected = True
if noise_detected:
raise error.TestFail('Signal is noiser than expected.')
def compare_file(reference_audio_filename, test_audio_filename):
"""Compares the recorded audio file to the reference audio file.
@param reference_audio_filename : Reference audio file containing the
reference signal.
@param test_audio_filename: Audio file containing audio captured from
the test.
"""
with contextlib.closing(wave.open(reference_audio_filename,
'rb')) as reference_file:
with contextlib.closing(wave.open(test_audio_filename,
'rb')) as rec_file:
# Extract data from files.
reference_file_frames = extract_wav_frames(reference_file)
rec_file_frames = extract_wav_frames(rec_file)
num_channels = reference_file.getnchannels()
_compare_frames(reference_file_frames, rec_file_frames,
reference_file.getnchannels(),
reference_file.getframerate())