You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
295 lines
10 KiB
295 lines
10 KiB
#!/usr/bin/env python3
|
|
###############################################################################################################
|
|
#
|
|
# Copyright (C) 2019 Motorola Mobility LLC
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without modification, are permitted provided that
|
|
# the following conditions are met:
|
|
#
|
|
# 1. Redistributions of source code must retain the above copyright notice, this list of conditions and the
|
|
# following disclaimer.
|
|
#
|
|
# 2. Redistributions in binary form must reproduce the above copyright notice, this list of conditions and
|
|
# the following disclaimer in the documentation and/or other materials provided with the distribution.
|
|
#
|
|
# 3. Neither the name of the copyright holder nor the names of its contributors may be used to endorse or
|
|
# promote products derived from this software without specific prior written permission.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED
|
|
# WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
|
|
# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR
|
|
# ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED
|
|
# TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
|
|
# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING
|
|
# NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
#
|
|
###############################################################################################################
|
|
###############################################################################################################
|
|
#
|
|
# Bluetooth Virtual Sniffing for Android
|
|
#
|
|
# This script supports Bluetooth Virtual Sniffing via Live Import Feature of Frontline Bluetooth Sniffer(FTS)
|
|
#
|
|
# It extracts the HCI packets from Snoop logs and redirect it to FTS for live HCI sniffing.
|
|
#
|
|
# It uses liveimport.ini and LiveImportAPI.dll from FTS path to communicate with FTS sniffer software.
|
|
#
|
|
# It works on both Windows and Ubuntu. For Ubuntu, both FTS and Python should be installed on Wine.
|
|
#
|
|
# FTS_INI_PATH & FTS_DLL_PATH should be set to absolute path of liveimport.ini and LiveImportAPI.dll of FTS
|
|
#
|
|
# Example below - This may change per machine per FTS version in Windows vs Ubuntu (Wine), set accordingly.
|
|
# For Windows
|
|
# FTS_INI_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 13.2\\'
|
|
# FTS_DLL_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 13.2\\Executables\\Core\\'
|
|
#
|
|
# For Ubuntu - FTS path recognized by Wine (not Ubuntu path)
|
|
# FTS_INI_PATH = 'C:\\Program Files\\Frontline Test System II\\Frontline 13.2\\'
|
|
# FTS_DLL_PATH = 'C:\\Program Files\\Frontline Test System II\\Frontline 13.2\\Executables\\Core\\'
|
|
#
|
|
###############################################################################################################
|
|
|
|
import os
|
|
import platform
|
|
import socket
|
|
import struct
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
if sys.version_info[0] >= 3:
|
|
import configparser
|
|
else:
|
|
import ConfigParser as configparser
|
|
|
|
from calendar import timegm
|
|
from ctypes import byref, c_bool, c_longlong, CDLL
|
|
from _ctypes import FreeLibrary
|
|
from datetime import datetime
|
|
|
|
# Update below to right path corresponding to your machine, FTS version and OS used.
|
|
FTS_INI_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 15.12\\'
|
|
FTS_DLL_PATH = 'C:\\Program Files (x86)\\Frontline Test System II\\Frontline 15.12\\Executables\\Core\\'
|
|
|
|
iniName = 'liveimport.ini'
|
|
if (platform.architecture()[0] == '32bit'):
|
|
dllName = 'LiveImportAPI.dll'
|
|
else:
|
|
dllName = 'LiveImportAPI_x64.dll'
|
|
|
|
launchFtsCmd = '\"' + FTS_DLL_PATH + 'fts.exe\"' + ' \"/ComProbe Protocol Analysis System=Generic\"' + ' \"/oemkey=Virtual\"'
|
|
|
|
# Unix Epoch delta since 01/01/1970
|
|
FILETIME_EPOCH_DELTA = 116444736000000000
|
|
HUNDREDS_OF_NANOSECONDS = 10000000
|
|
|
|
HOST = 'localhost'
|
|
PORT = 8872
|
|
SNOOP_ID = 16
|
|
SNOOP_HDR = 24
|
|
|
|
|
|
def get_file_time():
|
|
"""
|
|
Obtain current time in file time format for display
|
|
"""
|
|
date_time = datetime.now()
|
|
file_time = FILETIME_EPOCH_DELTA + (timegm(date_time.timetuple()) * HUNDREDS_OF_NANOSECONDS)
|
|
file_time = file_time + (date_time.microsecond * 10)
|
|
return file_time
|
|
|
|
|
|
def get_connection_string():
|
|
"""
|
|
Read ConnectionString from liveimport.ini
|
|
"""
|
|
config = configparser.ConfigParser()
|
|
config.read(FTS_INI_PATH + iniName)
|
|
try:
|
|
conn_str = config.get('General', 'ConnectionString')
|
|
except (configparser.NoSectionError, configparser.NoOptionError):
|
|
return None
|
|
|
|
return conn_str
|
|
|
|
|
|
def get_configuration_string():
|
|
"""
|
|
Read Configuration string from liveimport.ini
|
|
"""
|
|
config_str = ''
|
|
config = configparser.ConfigParser()
|
|
config.read(FTS_INI_PATH + iniName)
|
|
try:
|
|
config_items = config.items('Configuration')
|
|
except (configparser.NoSectionError, configparser.NoOptionError):
|
|
return None
|
|
|
|
if config_items is not None:
|
|
for item in config_items:
|
|
key, value = item
|
|
config_str += ("%s=%s\n" % (key, value))
|
|
return config_str
|
|
else:
|
|
return None
|
|
|
|
|
|
def check_live_import_connection(live_import):
|
|
"""
|
|
Launch FTS app in Virtual Sniffing Mode
|
|
Check if FTS App is ready to start receiving the data.
|
|
If not, wait until 1 min and exit if FTS didn't start.
|
|
"""
|
|
is_connection_running = c_bool()
|
|
count = 0
|
|
|
|
status = live_import.IsAppReady(byref(is_connection_running))
|
|
if (is_connection_running.value == True):
|
|
print("FTS is already launched, Start capture if not already started")
|
|
return True
|
|
|
|
print("Launching FTS Virtual Sniffing")
|
|
try:
|
|
ftsProcess = subprocess.Popen((launchFtsCmd), stdout=subprocess.PIPE)
|
|
except:
|
|
print("Error in Launching FTS.. exiting")
|
|
return False
|
|
|
|
while (is_connection_running.value == False and count < 12):
|
|
status = live_import.IsAppReady(byref(is_connection_running))
|
|
if (status < 0):
|
|
print("Live Import Internal Error %d" % (status))
|
|
return False
|
|
if (is_connection_running.value == False):
|
|
print("Waiting for 5 sec.. Open FTS Virtual Sniffing")
|
|
time.sleep(5)
|
|
count += 1
|
|
if (is_connection_running.value == True):
|
|
print("FTS is ready to receive the data, Start capture now")
|
|
return True
|
|
else:
|
|
print("FTS Virtual Sniffing didn't start until 1 min.. exiting")
|
|
return False
|
|
|
|
|
|
def init_live_import(conn_str, config_str):
|
|
"""
|
|
Load DLL and Initialize the LiveImport module for FTS.
|
|
"""
|
|
success = c_bool()
|
|
try:
|
|
live_import = CDLL(FTS_DLL_PATH + dllName)
|
|
except:
|
|
return None
|
|
|
|
if live_import is None:
|
|
print("Error: Path to LiveImportAPI.dll is incorrect.. exiting")
|
|
return None
|
|
|
|
print(dllName + " loaded successfully")
|
|
result = live_import.InitializeLiveImport(
|
|
conn_str.encode('ascii', 'ignore'), config_str.encode('ascii', 'ignore'), byref(success))
|
|
if (result < 0):
|
|
print("Live Import Init failed")
|
|
return None
|
|
else:
|
|
print("Live Import Init success")
|
|
return live_import
|
|
|
|
|
|
def release_live_import(live_import):
|
|
"""
|
|
Cleanup and exit live import module.
|
|
"""
|
|
if live_import is not None:
|
|
live_import.ReleaseLiveImport()
|
|
FreeLibrary(live_import._handle)
|
|
|
|
|
|
def main():
|
|
|
|
print("Bluetooth Virtual Sniffing for Fluoride")
|
|
connection_str = get_connection_string()
|
|
if connection_str is None:
|
|
print("Error: path to liveimport.ini is incorrect.. exiting")
|
|
exit(0)
|
|
|
|
configuration_str = get_configuration_string()
|
|
if configuration_str is None:
|
|
print("Error: path to liveimport.ini is incorrect.. exiting")
|
|
exit(0)
|
|
|
|
live_import = init_live_import(connection_str, configuration_str)
|
|
if live_import is None:
|
|
print("Error: Path to LiveImportAPI.dll is incorrect.. exiting")
|
|
exit(0)
|
|
|
|
if (check_live_import_connection(live_import) == False):
|
|
release_live_import(live_import)
|
|
exit(0)
|
|
|
|
# Wait until the forward socket is ready
|
|
print("Waiting until adb is ready")
|
|
os.system('adb wait-for-device forward tcp:8872 tcp:8872')
|
|
|
|
btsnoop_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
btsnoop_sock.connect((HOST, PORT))
|
|
snoop_id = btsnoop_sock.recv(SNOOP_ID)
|
|
if not snoop_id.startswith(b"btsnoop"):
|
|
print("Error: Snoop ID wasn't received.. exiting")
|
|
release_live_import(live_import)
|
|
exit(0)
|
|
|
|
while True:
|
|
try:
|
|
snoop_hdr = btsnoop_sock.recv(SNOOP_HDR)
|
|
if snoop_hdr is not None:
|
|
try:
|
|
olen, ilen, flags = struct.unpack(">LLL", snoop_hdr[0:12])
|
|
except struct.error:
|
|
print("Error: Invalid data", repr(snoop_hdr))
|
|
continue
|
|
|
|
file_time = get_file_time()
|
|
timestamp = c_longlong(file_time)
|
|
|
|
snoop_data = b''
|
|
while (len(snoop_data) < olen):
|
|
data_frag = btsnoop_sock.recv(olen - len(snoop_data))
|
|
if data_frag is not None:
|
|
snoop_data += data_frag
|
|
|
|
print("Bytes received %d Olen %d ilen %d flags %d" % (len(snoop_data), olen, ilen, flags))
|
|
packet_type = struct.unpack(">B", snoop_data[0:1])[0]
|
|
if packet_type == 1:
|
|
drf = 1
|
|
isend = 0
|
|
elif packet_type == 2:
|
|
drf = 2
|
|
if (flags & 0x01):
|
|
isend = 1
|
|
else:
|
|
isend = 0
|
|
elif packet_type == 3:
|
|
drf = 4
|
|
if (flags & 0x01):
|
|
isend = 1
|
|
else:
|
|
isend = 0
|
|
elif packet_type == 4:
|
|
drf = 8
|
|
isend = 1
|
|
|
|
result = live_import.SendFrame(olen - 1, olen - 1, snoop_data[1:olen], drf, isend, timestamp)
|
|
if (result < 0):
|
|
print("Send frame failed")
|
|
except KeyboardInterrupt:
|
|
print("Cleanup and exit")
|
|
release_live_import(live_import)
|
|
btsnoop_sock.close()
|
|
exit(0)
|
|
|
|
|
|
if __name__ == '__main__':
|
|
main()
|