You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
632 lines
24 KiB
632 lines
24 KiB
#!/usr/bin/env python
|
|
# Copyright 2018 The Android Open Source Project
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may not
|
|
# use this file except in compliance with the License. You may obtain a copy of
|
|
# the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations under
|
|
# the License.
|
|
#
|
|
#
|
|
#
|
|
#
|
|
# This script extracts Hearing Aid audio data from btsnoop.
|
|
# Generates a valid audio file which can be played using player like smplayer.
|
|
#
|
|
# Audio File Name Format:
|
|
# [PEER_ADDRESS]-[START_TIMESTAMP]-[AUDIO_TYPE]-[SAMPLE_RATE].[CODEC]
|
|
#
|
|
# Debug Infomation File Name Format:
|
|
# debug_ver_[DEBUG_VERSION]-[PEER_ADDRESS]-[START_TIMESTAMP]-[AUDIO_TYPE]-[SAMPLE_RATE].txt
|
|
#
|
|
# Player:
|
|
# smplayer
|
|
#
|
|
# NOTE:
|
|
# Please make sure you HCI Snoop data file includes the following frames:
|
|
# HearingAid "LE Enhanced Connection Complete", GATT write for Audio Control
|
|
# Point with "Start cmd", and the data frames.
|
|
|
|
import argparse
|
|
import os
|
|
import struct
|
|
import sys
|
|
import time
|
|
|
|
IS_SENT = "IS_SENT"
|
|
PEER_ADDRESS = "PEER_ADDRESS"
|
|
CONNECTION_HANDLE = "CONNECTION_HANDLE"
|
|
AUDIO_CONTROL_ATTR_HANDLE = "AUDIO_CONTROL_ATTR_HANDLE"
|
|
START = "START"
|
|
TIMESTAMP_STR_FORMAT = "TIMESTAMP_STR_FORMAT"
|
|
TIMESTAMP_TIME_FORMAT = "TIMESTAMP_TIME_FORMAT"
|
|
CODEC = "CODEC"
|
|
SAMPLE_RATE = "SAMPLE_RATE"
|
|
AUDIO_TYPE = "AUDIO_TYPE"
|
|
DEBUG_VERSION = "DEBUG_VERSION"
|
|
DEBUG_DATA = "DEBUG_DATA"
|
|
AUDIO_DATA_B = "AUDIO_DATA_B"
|
|
|
|
# Debug packet header struct
|
|
header_list_str = ["Event Processed", "Number Packet Nacked By Peripheral", "Number Packet Nacked By Central"]
|
|
# Debug frame information structs
|
|
data_list_str = [
|
|
"Event Number", "Overrun", "Underrun", "Skips", "Rendered Audio Frame", "First PDU Option", "Second PDU Option",
|
|
"Third PDU Option"
|
|
]
|
|
|
|
AUDIO_CONTROL_POINT_UUID = "f0d4de7e4a88476c9d9f1937b0996cc0"
|
|
SEC_CONVERT = 1000000
|
|
folder = None
|
|
full_debug = False
|
|
simple_debug = False
|
|
|
|
force_audio_control_attr_handle = None
|
|
default_audio_control_attr_handle = 0x0079
|
|
|
|
audio_data = {}
|
|
|
|
#=======================================================================
|
|
# Parse ACL Data Function
|
|
#=======================================================================
|
|
|
|
#-----------------------------------------------------------------------
|
|
# Parse Hearing Aid Packet
|
|
#-----------------------------------------------------------------------
|
|
|
|
|
|
def parse_acl_ha_debug_buffer(data, result):
|
|
"""This function extracts HA debug buffer"""
|
|
if len(data) < 5:
|
|
return
|
|
|
|
version, data = unpack_data(data, 1)
|
|
update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], DEBUG_VERSION, str(version))
|
|
|
|
debug_str = result[TIMESTAMP_TIME_FORMAT]
|
|
for p in range(3):
|
|
byte_data, data = unpack_data(data, 1)
|
|
debug_str = debug_str + ", " + header_list_str[p] + "=" + str(byte_data).rjust(3)
|
|
|
|
if full_debug:
|
|
debug_str = debug_str + "\n" + "|".join(data_list_str) + "\n"
|
|
while True:
|
|
if len(data) < 7:
|
|
break
|
|
base = 0
|
|
data_list_content = []
|
|
for counter in range(6):
|
|
p = base + counter
|
|
byte_data, data = unpack_data(data, 1)
|
|
if p == 1:
|
|
data_list_content.append(str(byte_data & 0x03).rjust(len(data_list_str[p])))
|
|
data_list_content.append(str((byte_data >> 2) & 0x03).rjust(len(data_list_str[p + 1])))
|
|
data_list_content.append(str((byte_data >> 4) & 0x0f).rjust(len(data_list_str[p + 2])))
|
|
base = 2
|
|
else:
|
|
data_list_content.append(str(byte_data).rjust(len(data_list_str[p])))
|
|
debug_str = debug_str + "|".join(data_list_content) + "\n"
|
|
|
|
update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], DEBUG_DATA, debug_str)
|
|
|
|
|
|
def parse_acl_ha_audio_data(data, result):
|
|
"""This function extracts HA audio data."""
|
|
if len(data) < 2:
|
|
return
|
|
# Remove audio packet number
|
|
audio_data_b = data[1:]
|
|
update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], AUDIO_DATA_B, audio_data_b)
|
|
|
|
|
|
def parse_acl_ha_audio_type(data, result):
|
|
"""This function parses HA audio control cmd audio type."""
|
|
audio_type, data = unpack_data(data, 1)
|
|
if audio_type is None:
|
|
return
|
|
elif audio_type == 0x01:
|
|
audio_type = "Ringtone"
|
|
elif audio_type == 0x02:
|
|
audio_type = "Phonecall"
|
|
elif audio_type == 0x03:
|
|
audio_type = "Media"
|
|
else:
|
|
audio_type = "Unknown"
|
|
update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], AUDIO_TYPE, audio_type)
|
|
|
|
|
|
def parse_acl_ha_codec(data, result):
|
|
"""This function parses HA audio control cmd codec and sample rate."""
|
|
codec, data = unpack_data(data, 1)
|
|
if codec == 0x01:
|
|
codec = "G722"
|
|
sample_rate = "16KHZ"
|
|
elif codec == 0x02:
|
|
codec = "G722"
|
|
sample_rate = "24KHZ"
|
|
else:
|
|
codec = "Unknown"
|
|
sample_rate = "Unknown"
|
|
update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], CODEC, codec)
|
|
update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], SAMPLE_RATE, sample_rate)
|
|
parse_acl_ha_audio_type(data, result)
|
|
|
|
|
|
def parse_acl_ha_audio_control_cmd(data, result):
|
|
"""This function parses HA audio control cmd is start/stop."""
|
|
control_cmd, data = unpack_data(data, 1)
|
|
if control_cmd == 0x01:
|
|
update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], START, True)
|
|
update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], TIMESTAMP_STR_FORMAT,
|
|
result[TIMESTAMP_STR_FORMAT])
|
|
parse_acl_ha_codec(data, result)
|
|
elif control_cmd == 0x02:
|
|
update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], START, False)
|
|
|
|
|
|
#-----------------------------------------------------------------------
|
|
# Parse ACL Packet
|
|
#-----------------------------------------------------------------------
|
|
|
|
|
|
def parse_acl_att_long_uuid(data, result):
|
|
"""This function parses ATT long UUID to get attr_handle."""
|
|
# len (1 byte) + start_attr_handle (2 bytes) + properties (1 byte) +
|
|
# attr_handle (2 bytes) + long_uuid (16 bytes) = 22 bytes
|
|
if len(data) < 22:
|
|
return
|
|
# skip unpack len, start_attr_handle, properties.
|
|
data = data[4:]
|
|
attr_handle, data = unpack_data(data, 2)
|
|
long_uuid_list = []
|
|
for p in range(0, 16):
|
|
long_uuid_list.append("{0:02x}".format(struct.unpack(">B", data[p])[0]))
|
|
long_uuid_list.reverse()
|
|
long_uuid = "".join(long_uuid_list)
|
|
# Check long_uuid is AUDIO_CONTROL_POINT uuid to get the attr_handle.
|
|
if long_uuid == AUDIO_CONTROL_POINT_UUID:
|
|
update_audio_data(CONNECTION_HANDLE, result[CONNECTION_HANDLE], AUDIO_CONTROL_ATTR_HANDLE, attr_handle)
|
|
|
|
|
|
def parse_acl_opcode(data, result):
|
|
"""This function parses acl data opcode."""
|
|
# opcode (1 byte) = 1 bytes
|
|
if len(data) < 1:
|
|
return
|
|
opcode, data = unpack_data(data, 1)
|
|
# Check opcode is 0x12 (write request) and attr_handle is
|
|
# audio_control_attr_handle for check it is HA audio control cmd.
|
|
if result[IS_SENT] and opcode == 0x12:
|
|
if len(data) < 2:
|
|
return
|
|
attr_handle, data = unpack_data(data, 2)
|
|
if attr_handle == \
|
|
get_audio_control_attr_handle(result[CONNECTION_HANDLE]):
|
|
parse_acl_ha_audio_control_cmd(data, result)
|
|
# Check opcode is 0x09 (read response) to parse ATT long UUID.
|
|
elif not result[IS_SENT] and opcode == 0x09:
|
|
parse_acl_att_long_uuid(data, result)
|
|
|
|
|
|
def parse_acl_handle(data, result):
|
|
"""This function parses acl data handle."""
|
|
# connection_handle (2 bytes) + total_len (2 bytes) + pdu (2 bytes)
|
|
# + channel_id (2 bytes) = 8 bytes
|
|
if len(data) < 8:
|
|
return
|
|
connection_handle, data = unpack_data(data, 2)
|
|
connection_handle = connection_handle & 0x0FFF
|
|
# skip unpack total_len
|
|
data = data[2:]
|
|
pdu, data = unpack_data(data, 2)
|
|
channel_id, data = unpack_data(data, 2)
|
|
|
|
# Check ATT packet or "Coc Data Packet" to get ATT information and audio
|
|
# data.
|
|
if connection_handle <= 0x0EFF:
|
|
if channel_id <= 0x003F:
|
|
result[CONNECTION_HANDLE] = connection_handle
|
|
parse_acl_opcode(data, result)
|
|
elif channel_id >= 0x0040 and channel_id <= 0x007F:
|
|
result[CONNECTION_HANDLE] = connection_handle
|
|
sdu, data = unpack_data(data, 2)
|
|
if pdu - 2 == sdu:
|
|
if result[IS_SENT]:
|
|
parse_acl_ha_audio_data(data, result)
|
|
else:
|
|
if simple_debug:
|
|
parse_acl_ha_debug_buffer(data, result)
|
|
|
|
|
|
#=======================================================================
|
|
# Parse HCI EVT Function
|
|
#=======================================================================
|
|
|
|
|
|
def parse_hci_evt_peer_address(data, result):
|
|
"""This function parses peer address from hci event."""
|
|
peer_address_list = []
|
|
address_empty_list = ["00", "00", "00", "00", "00", "00"]
|
|
for n in range(0, 3):
|
|
if len(data) < 6:
|
|
return
|
|
for p in range(0, 6):
|
|
peer_address_list.append("{0:02x}".format(struct.unpack(">B", data[p])[0]))
|
|
# Check the address is empty or not.
|
|
if peer_address_list == address_empty_list:
|
|
del peer_address_list[:]
|
|
data = data[6:]
|
|
else:
|
|
break
|
|
peer_address_list.reverse()
|
|
peer_address = "_".join(peer_address_list)
|
|
update_audio_data("", "", PEER_ADDRESS, peer_address)
|
|
update_audio_data(PEER_ADDRESS, peer_address, CONNECTION_HANDLE, result[CONNECTION_HANDLE])
|
|
|
|
|
|
def parse_hci_evt_code(data, result):
|
|
"""This function parses hci event content."""
|
|
# hci_evt (1 byte) + param_total_len (1 byte) + sub_event (1 byte)
|
|
# + status (1 byte) + connection_handle (2 bytes) + role (1 byte)
|
|
# + address_type (1 byte) = 8 bytes
|
|
if len(data) < 8:
|
|
return
|
|
hci_evt, data = unpack_data(data, 1)
|
|
# skip unpack param_total_len.
|
|
data = data[1:]
|
|
sub_event, data = unpack_data(data, 1)
|
|
status, data = unpack_data(data, 1)
|
|
connection_handle, data = unpack_data(data, 2)
|
|
connection_handle = connection_handle & 0x0FFF
|
|
# skip unpack role, address_type.
|
|
data = data[2:]
|
|
# We will directly check it is LE Enhanced Connection Complete or not
|
|
# for get Connection Handle and Address.
|
|
if not result[IS_SENT] and hci_evt == 0x3E and sub_event == 0x0A \
|
|
and status == 0x00 and connection_handle <= 0x0EFF:
|
|
result[CONNECTION_HANDLE] = connection_handle
|
|
parse_hci_evt_peer_address(data, result)
|
|
|
|
|
|
#=======================================================================
|
|
# Common Parse Function
|
|
#=======================================================================
|
|
|
|
|
|
def parse_packet_data(data, result):
|
|
"""This function parses packet type."""
|
|
packet_type, data = unpack_data(data, 1)
|
|
if packet_type == 0x02:
|
|
# Try to check HearingAid audio control packet and data packet.
|
|
parse_acl_handle(data, result)
|
|
elif packet_type == 0x04:
|
|
# Try to check HearingAid connection successful packet.
|
|
parse_hci_evt_code(data, result)
|
|
|
|
|
|
def parse_packet(btsnoop_file):
|
|
"""This function parses packet len, timestamp."""
|
|
packet_result = {}
|
|
|
|
# ori_len (4 bytes) + include_len (4 bytes) + packet_flag (4 bytes)
|
|
# + drop (4 bytes) + timestamp (8 bytes) = 24 bytes
|
|
packet_header = btsnoop_file.read(24)
|
|
if len(packet_header) != 24:
|
|
return False
|
|
|
|
ori_len, include_len, packet_flag, drop, timestamp = \
|
|
struct.unpack(">IIIIq", packet_header)
|
|
|
|
if ori_len == include_len:
|
|
packet_data = btsnoop_file.read(ori_len)
|
|
if len(packet_data) != ori_len:
|
|
return False
|
|
if packet_flag != 2 and drop == 0:
|
|
packet_result[IS_SENT] = (packet_flag == 0)
|
|
packet_result[TIMESTAMP_STR_FORMAT], packet_result[TIMESTAMP_TIME_FORMAT] = convert_time_str(timestamp)
|
|
parse_packet_data(packet_data, packet_result)
|
|
else:
|
|
return False
|
|
|
|
return True
|
|
|
|
|
|
#=======================================================================
|
|
# Update and DumpData Function
|
|
#=======================================================================
|
|
|
|
|
|
def dump_audio_data(data):
|
|
"""This function dumps audio data into file."""
|
|
file_type = "." + data[CODEC]
|
|
file_name_list = []
|
|
file_name_list.append(data[PEER_ADDRESS])
|
|
file_name_list.append(data[TIMESTAMP_STR_FORMAT])
|
|
file_name_list.append(data[AUDIO_TYPE])
|
|
file_name_list.append(data[SAMPLE_RATE])
|
|
if folder is not None:
|
|
if not os.path.exists(folder):
|
|
os.makedirs(folder)
|
|
audio_file_name = os.path.join(folder, "-".join(file_name_list) + file_type)
|
|
if data.has_key(DEBUG_VERSION):
|
|
file_prefix = "debug_ver_" + data[DEBUG_VERSION] + "-"
|
|
debug_file_name = os.path.join(folder, file_prefix + "-".join(file_name_list) + ".txt")
|
|
else:
|
|
audio_file_name = "-".join(file_name_list) + file_type
|
|
if data.has_key(DEBUG_VERSION):
|
|
file_prefix = "debug_ver_" + data[DEBUG_VERSION] + "-"
|
|
debug_file_name = file_prefix + "-".join(file_name_list) + ".txt"
|
|
|
|
sys.stdout.write("Start to dump Audio File : %s\n" % audio_file_name)
|
|
if data.has_key(AUDIO_DATA_B):
|
|
with open(audio_file_name, "wb+") as audio_file:
|
|
audio_file.write(data[AUDIO_DATA_B])
|
|
sys.stdout.write("Finished to dump Audio File: %s\n\n" % audio_file_name)
|
|
else:
|
|
sys.stdout.write("Fail to dump Audio File: %s\n" % audio_file_name)
|
|
sys.stdout.write("There isn't any Hearing Aid audio data.\n\n")
|
|
|
|
if simple_debug:
|
|
sys.stdout.write("Start to dump audio %s Debug File\n" % audio_file_name)
|
|
if data.has_key(DEBUG_DATA):
|
|
with open(debug_file_name, "wb+") as debug_file:
|
|
debug_file.write(data[DEBUG_DATA])
|
|
sys.stdout.write("Finished to dump Debug File: %s\n\n" % debug_file_name)
|
|
else:
|
|
sys.stdout.write("Fail to dump audio %s Debug File\n" % audio_file_name)
|
|
sys.stdout.write("There isn't any Hearing Aid debug data.\n\n")
|
|
|
|
|
|
def update_audio_data(relate_key, relate_value, key, value):
|
|
"""
|
|
This function records the dump audio file related information.
|
|
audio_data = {
|
|
PEER_ADDRESS:{
|
|
PEER_ADDRESS: PEER_ADDRESS,
|
|
CONNECTION_HANDLE: CONNECTION_HANDLE,
|
|
AUDIO_CONTROL_ATTR_HANDLE: AUDIO_CONTROL_ATTR_HANDLE,
|
|
START: True or False,
|
|
TIMESTAMP_STR_FORMAT: START_TIMESTAMP_STR_FORMAT,
|
|
CODEC: CODEC,
|
|
SAMPLE_RATE: SAMPLE_RATE,
|
|
AUDIO_TYPE: AUDIO_TYPE,
|
|
DEBUG_VERSION: DEBUG_VERSION,
|
|
DEBUG_DATA: DEBUG_DATA,
|
|
AUDIO_DATA_B: AUDIO_DATA_B
|
|
},
|
|
PEER_ADDRESS_2:{
|
|
PEER_ADDRESS: PEER_ADDRESS,
|
|
CONNECTION_HANDLE: CONNECTION_HANDLE,
|
|
AUDIO_CONTROL_ATTR_HANDLE: AUDIO_CONTROL_ATTR_HANDLE,
|
|
START: True or False,
|
|
TIMESTAMP_STR_FORMAT: START_TIMESTAMP_STR_FORMAT,
|
|
CODEC: CODEC,
|
|
SAMPLE_RATE: SAMPLE_RATE,
|
|
AUDIO_TYPE: AUDIO_TYPE,
|
|
DEBUG_VERSION: DEBUG_VERSION,
|
|
DEBUG_DATA: DEBUG_DATA,
|
|
AUDIO_DATA_B: AUDIO_DATA_B
|
|
}
|
|
}
|
|
"""
|
|
if key == PEER_ADDRESS:
|
|
if audio_data.has_key(value):
|
|
# Dump audio data and clear previous data.
|
|
update_audio_data(key, value, START, False)
|
|
# Extra clear CONNECTION_HANDLE due to new connection create.
|
|
if audio_data[value].has_key(CONNECTION_HANDLE):
|
|
audio_data[value].pop(CONNECTION_HANDLE, "")
|
|
else:
|
|
device_audio_data = {key: value}
|
|
temp_audio_data = {value: device_audio_data}
|
|
audio_data.update(temp_audio_data)
|
|
else:
|
|
for i in audio_data:
|
|
if audio_data[i].has_key(relate_key) \
|
|
and audio_data[i][relate_key] == relate_value:
|
|
if key == START:
|
|
if audio_data[i].has_key(key) and audio_data[i][key]:
|
|
dump_audio_data(audio_data[i])
|
|
# Clear data except PEER_ADDRESS, CONNECTION_HANDLE and
|
|
# AUDIO_CONTROL_ATTR_HANDLE.
|
|
audio_data[i].pop(key, "")
|
|
audio_data[i].pop(TIMESTAMP_STR_FORMAT, "")
|
|
audio_data[i].pop(CODEC, "")
|
|
audio_data[i].pop(SAMPLE_RATE, "")
|
|
audio_data[i].pop(AUDIO_TYPE, "")
|
|
audio_data[i].pop(DEBUG_VERSION, "")
|
|
audio_data[i].pop(DEBUG_DATA, "")
|
|
audio_data[i].pop(AUDIO_DATA_B, "")
|
|
elif key == AUDIO_DATA_B or key == DEBUG_DATA:
|
|
if audio_data[i].has_key(START) and audio_data[i][START]:
|
|
if audio_data[i].has_key(key):
|
|
ori_data = audio_data[i].pop(key, "")
|
|
value = ori_data + value
|
|
else:
|
|
# Audio doesn't start, don't record.
|
|
return
|
|
device_audio_data = {key: value}
|
|
audio_data[i].update(device_audio_data)
|
|
|
|
|
|
#=======================================================================
|
|
# Tool Function
|
|
#=======================================================================
|
|
|
|
|
|
def get_audio_control_attr_handle(connection_handle):
|
|
"""This function gets audio_control_attr_handle."""
|
|
# If force_audio_control_attr_handle is set, will use it first.
|
|
if force_audio_control_attr_handle is not None:
|
|
return force_audio_control_attr_handle
|
|
|
|
# Try to check the audio_control_attr_handle is record into audio_data.
|
|
for i in audio_data:
|
|
if audio_data[i].has_key(CONNECTION_HANDLE) \
|
|
and audio_data[i][CONNECTION_HANDLE] == connection_handle:
|
|
if audio_data[i].has_key(AUDIO_CONTROL_ATTR_HANDLE):
|
|
return audio_data[i][AUDIO_CONTROL_ATTR_HANDLE]
|
|
|
|
# Return default attr_handle if audio_data doesn't record it.
|
|
return default_audio_control_attr_handle
|
|
|
|
|
|
def unpack_data(data, byte):
|
|
"""This function unpacks data."""
|
|
if byte == 1:
|
|
value = struct.unpack(">B", data[0])[0]
|
|
elif byte == 2:
|
|
value = struct.unpack(">H", data[1] + data[0])[0]
|
|
else:
|
|
value = ""
|
|
data = data[byte:]
|
|
return value, data
|
|
|
|
|
|
def convert_time_str(timestamp):
|
|
"""This function converts time to string format."""
|
|
really_timestamp = float(timestamp) / SEC_CONVERT
|
|
local_timestamp = time.localtime(really_timestamp)
|
|
dt = really_timestamp - long(really_timestamp)
|
|
ms_str = "{0:06}".format(int(round(dt * 1000000)))
|
|
|
|
str_format = time.strftime("%m_%d__%H_%M_%S", local_timestamp)
|
|
full_str_format = str_format + "_" + ms_str
|
|
|
|
time_format = time.strftime("%m-%d %H:%M:%S", local_timestamp)
|
|
full_time_format = time_format + "." + ms_str
|
|
return full_str_format, full_time_format
|
|
|
|
|
|
def set_config():
|
|
"""This function is for set config by flag and check the argv is correct."""
|
|
argv_parser = argparse.ArgumentParser(description="Extracts Hearing Aid audio data from BTSNOOP.")
|
|
argv_parser.add_argument("BTSNOOP", help="BLUETOOTH BTSNOOP file.")
|
|
argv_parser.add_argument("-f", "--folder", help="select output folder.", dest="folder")
|
|
argv_parser.add_argument(
|
|
"-c1",
|
|
"--connection-handle1",
|
|
help="set a fake connection handle 1 to capture \
|
|
audio dump.",
|
|
dest="connection_handle1",
|
|
type=int)
|
|
argv_parser.add_argument(
|
|
"-c2",
|
|
"--connection-handle2",
|
|
help="set a fake connection handle 2 to capture \
|
|
audio dump.",
|
|
dest="connection_handle2",
|
|
type=int)
|
|
argv_parser.add_argument(
|
|
"-ns",
|
|
"--no-start",
|
|
help="No audio 'Start' cmd is \
|
|
needed before extracting audio data.",
|
|
dest="no_start",
|
|
default="False")
|
|
argv_parser.add_argument(
|
|
"-dc",
|
|
"--default-codec",
|
|
help="set a default \
|
|
codec.",
|
|
dest="codec",
|
|
default="G722")
|
|
argv_parser.add_argument(
|
|
"-a",
|
|
"--attr-handle",
|
|
help="force to select audio control attr handle.",
|
|
dest="audio_control_attr_handle",
|
|
type=int)
|
|
argv_parser.add_argument(
|
|
"-d", "--debug", help="dump full debug buffer content.", dest="full_debug", default="False")
|
|
argv_parser.add_argument(
|
|
"-sd", "--simple-debug", help="dump debug buffer header content.", dest="simple_debug", default="False")
|
|
arg = argv_parser.parse_args()
|
|
|
|
if arg.folder is not None:
|
|
global folder
|
|
folder = arg.folder
|
|
|
|
if arg.connection_handle1 is not None and arg.connection_handle2 is not None \
|
|
and arg.connection_handle1 == arg.connection_handle2:
|
|
argv_parser.error("connection_handle1 can't be same with \
|
|
connection_handle2")
|
|
exit(1)
|
|
|
|
if not (arg.no_start.lower() == "true" or arg.no_start.lower() == "false"):
|
|
argv_parser.error("-ns/--no-start arg is invalid, it should be true/false.")
|
|
exit(1)
|
|
|
|
if arg.connection_handle1 is not None:
|
|
fake_name = "ConnectionHandle" + str(arg.connection_handle1)
|
|
update_audio_data("", "", PEER_ADDRESS, fake_name)
|
|
update_audio_data(PEER_ADDRESS, fake_name, CONNECTION_HANDLE, arg.connection_handle1)
|
|
if arg.no_start.lower() == "true":
|
|
update_audio_data(PEER_ADDRESS, fake_name, START, True)
|
|
update_audio_data(PEER_ADDRESS, fake_name, TIMESTAMP_STR_FORMAT, "Unknown")
|
|
update_audio_data(PEER_ADDRESS, fake_name, CODEC, arg.codec)
|
|
update_audio_data(PEER_ADDRESS, fake_name, SAMPLE_RATE, "Unknown")
|
|
update_audio_data(PEER_ADDRESS, fake_name, AUDIO_TYPE, "Unknown")
|
|
|
|
if arg.connection_handle2 is not None:
|
|
fake_name = "ConnectionHandle" + str(arg.connection_handle2)
|
|
update_audio_data("", "", PEER_ADDRESS, fake_name)
|
|
update_audio_data(PEER_ADDRESS, fake_name, CONNECTION_HANDLE, arg.connection_handle2)
|
|
if arg.no_start.lower() == "true":
|
|
update_audio_data(PEER_ADDRESS, fake_name, START, True)
|
|
update_audio_data(PEER_ADDRESS, fake_name, TIMESTAMP_STR_FORMAT, "Unknown")
|
|
update_audio_data(PEER_ADDRESS, fake_name, CODEC, arg.codec)
|
|
update_audio_data(PEER_ADDRESS, fake_name, SAMPLE_RATE, "Unknown")
|
|
update_audio_data(PEER_ADDRESS, fake_name, AUDIO_TYPE, "Unknown")
|
|
|
|
if arg.audio_control_attr_handle is not None:
|
|
global force_audio_control_attr_handle
|
|
force_audio_control_attr_handle = arg.audio_control_attr_handle
|
|
|
|
global full_debug
|
|
global simple_debug
|
|
if arg.full_debug.lower() == "true":
|
|
full_debug = True
|
|
simple_debug = True
|
|
elif arg.simple_debug.lower() == "true":
|
|
simple_debug = True
|
|
|
|
if os.path.isfile(arg.BTSNOOP):
|
|
return arg.BTSNOOP
|
|
else:
|
|
argv_parser.error("BTSNOOP file not found: %s" % arg.BTSNOOP)
|
|
exit(1)
|
|
|
|
|
|
def main():
|
|
btsnoop_file_name = set_config()
|
|
|
|
with open(btsnoop_file_name, "rb") as btsnoop_file:
|
|
identification = btsnoop_file.read(8)
|
|
if identification != "btsnoop\0":
|
|
sys.stderr.write("Check identification fail. It is not correct btsnoop file.")
|
|
exit(1)
|
|
|
|
ver, data_link = struct.unpack(">II", btsnoop_file.read(4 + 4))
|
|
if (ver != 1) or (data_link != 1002):
|
|
sys.stderr.write("Check ver or dataLink fail. It is not correct btsnoop file.")
|
|
exit(1)
|
|
|
|
while True:
|
|
if not parse_packet(btsnoop_file):
|
|
break
|
|
|
|
for i in audio_data:
|
|
if audio_data[i].get(START, False):
|
|
dump_audio_data(audio_data[i])
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|