You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
743 lines
27 KiB
743 lines
27 KiB
#!/usr/bin/env python2
|
|
|
|
# Copyright (c) 2012 The Chromium OS Authors. All rights reserved.
|
|
# Use of this source code is governed by a BSD-style license that can be
|
|
# found in the LICENSE file.
|
|
"""An implementation of the ModemManager1 DBUS interface.
|
|
|
|
This modem mimics a GSM (eventually LTE & CDMA) modem and allows a
|
|
user to test shill and UI behaviors when a supported SIM is inserted
|
|
into the device. Invoked with the proper flags it can test that SMS
|
|
messages are deliver to the UI.
|
|
|
|
This program creates a virtual network interface to simulate the
|
|
network interface of a modem. It depends on modemmanager-next to
|
|
set the dbus permissions properly.
|
|
|
|
TODO:
|
|
* Use more appropriate values for many of the properties
|
|
* Support all ModemManager1 interfaces
|
|
* implement LTE modems
|
|
* implement CDMA modems
|
|
"""
|
|
|
|
from optparse import OptionParser
|
|
import logging
|
|
import os
|
|
import signal
|
|
import string
|
|
import subprocess
|
|
import sys
|
|
import time
|
|
|
|
import dbus
|
|
from dbus.exceptions import DBusException
|
|
import dbus.mainloop.glib
|
|
import dbus.service
|
|
from dbus.types import Int32
|
|
from dbus.types import ObjectPath
|
|
from dbus.types import Struct
|
|
from dbus.types import UInt32
|
|
import glib
|
|
import gobject
|
|
import mm1
|
|
|
|
|
|
# Miscellaneous delays to simulate a modem
|
|
DEFAULT_CONNECT_DELAY_MS = 1500
|
|
|
|
DEFAULT_CARRIER = 'att'
|
|
|
|
|
|
class DBusObjectWithProperties(dbus.service.Object):
|
|
"""Implements the org.freedesktop.DBus.Properties interface.
|
|
|
|
Implements the org.freedesktop.DBus.Properties interface, specifically
|
|
the Get and GetAll methods. Class which inherit from this class must
|
|
implement the InterfacesAndProperties function which will return a
|
|
dictionary of all interfaces and the properties defined on those interfaces.
|
|
"""
|
|
|
|
def __init__(self, bus, path):
|
|
dbus.service.Object.__init__(self, bus, path)
|
|
|
|
@dbus.service.method(dbus.PROPERTIES_IFACE,
|
|
in_signature='ss', out_signature='v')
|
|
def Get(self, interface, property_name, *args, **kwargs):
|
|
"""Returns: The value of property_name on interface."""
|
|
logging.info('%s: Get %s, %s', self.path, interface, property_name)
|
|
interfaces = self.InterfacesAndProperties()
|
|
properties = interfaces.get(interface, None)
|
|
if property_name in properties:
|
|
return properties[property_name]
|
|
raise dbus.exceptions.DBusException(
|
|
mm1.MODEM_MANAGER_INTERFACE + '.UnknownProperty',
|
|
'Property %s not defined for interface %s' %
|
|
(property_name, interface))
|
|
|
|
@dbus.service.method(dbus.PROPERTIES_IFACE,
|
|
in_signature='s', out_signature='a{sv}')
|
|
def GetAll(self, interface, *args, **kwargs):
|
|
"""Returns: A dictionary. The properties on interface."""
|
|
logging.info('%s: GetAll %s', self.path, interface)
|
|
interfaces = self.InterfacesAndProperties()
|
|
properties = interfaces.get(interface, None)
|
|
if properties is not None:
|
|
return properties
|
|
raise dbus.exceptions.DBusException(
|
|
mm1.MODEM_MANAGER_INTERFACE + '.UnknownInterface',
|
|
'Object does not implement the %s interface' % interface)
|
|
|
|
def InterfacesAndProperties(self):
|
|
"""Subclasses must implement this function.
|
|
|
|
Returns:
|
|
A dictionary of interfaces where the values are dictionaries
|
|
of dbus properties.
|
|
"""
|
|
pass
|
|
|
|
|
|
class SIM(DBusObjectWithProperties):
|
|
"""SIM Object.
|
|
|
|
Mock SIM Card and the typical information it might contain.
|
|
SIM cards of different carriers can be created by providing
|
|
the MCC, MNC, operator name, imsi, and msin. SIM objects are
|
|
passed to the Modem during Modem initialization.
|
|
"""
|
|
|
|
DEFAULT_MCC = '310'
|
|
DEFAULT_MNC = '090'
|
|
DEFAULT_OPERATOR = 'AT&T'
|
|
DEFAULT_MSIN = '1234567890'
|
|
DEFAULT_IMSI = '888999111'
|
|
MCC_LIST = {
|
|
'us': '310',
|
|
'de': '262',
|
|
'es': '214',
|
|
'fr': '208',
|
|
'gb': '234',
|
|
'it': '222',
|
|
'nl': '204',
|
|
}
|
|
CARRIERS = {
|
|
'att': ('us', '090', 'AT&T'),
|
|
'tmobile': ('us', '026', 'T-Mobile'),
|
|
'simyo': ('de', '03', 'simyo'),
|
|
'movistar': ('es', '07', 'Movistar'),
|
|
'sfr': ('fr', '10', 'SFR'),
|
|
'three': ('gb', '20', '3'),
|
|
'threeita': ('it', '99', '3ITA'),
|
|
'kpn': ('nl', '08', 'KPN')
|
|
}
|
|
|
|
def __init__(self,
|
|
manager,
|
|
mcc_country='us',
|
|
mnc=DEFAULT_MNC,
|
|
operator_name=DEFAULT_OPERATOR,
|
|
msin=DEFAULT_MSIN,
|
|
imsi=None,
|
|
mcc=None,
|
|
name='/Sim/0'):
|
|
self.manager = manager
|
|
self.name = name
|
|
self.path = manager.path + name
|
|
self.mcc = mcc or SIM.MCC_LIST.get(mcc_country, '000')
|
|
self.mnc = mnc
|
|
self.operator_name = operator_name
|
|
self.msin = msin
|
|
self.imsi = imsi or (self.mcc + self.mnc + SIM.DEFAULT_IMSI)
|
|
DBusObjectWithProperties.__init__(self, manager.bus, self.path)
|
|
|
|
@staticmethod
|
|
def FromCarrier(carrier, manager):
|
|
"""Creates a SIM card object for a given carrier."""
|
|
args = SIM.CARRIERS.get(carrier, [])
|
|
return SIM(manager, *args)
|
|
|
|
def Properties(self):
|
|
return {
|
|
'SimIdentifier': self.msin,
|
|
'Imsi': self.imsi,
|
|
'OperatorIdentifier': self.mcc + self.mnc,
|
|
'OperatorName': self.operator_name
|
|
}
|
|
|
|
def InterfacesAndProperties(self):
|
|
return {mm1.SIM_INTERFACE: self.Properties()}
|
|
|
|
class SMS(DBusObjectWithProperties):
|
|
"""SMS Object.
|
|
|
|
Mock SMS message.
|
|
"""
|
|
|
|
def __init__(self, manager, name='/SMS/0', text='test',
|
|
number='123', timestamp='12:00', smsc=''):
|
|
self.manager = manager
|
|
self.name = name
|
|
self.path = manager.path + name
|
|
self.text = text or 'test sms at %s' % name
|
|
self.number = number
|
|
self.timestamp = timestamp
|
|
self.smsc = smsc
|
|
DBusObjectWithProperties.__init__(self, manager.bus, self.path)
|
|
|
|
def Properties(self):
|
|
# TODO(jglasgow): State, Validity, Class, Storage are also defined
|
|
return {
|
|
'Text': self.text,
|
|
'Number': self.number,
|
|
'Timestamp': self.timestamp,
|
|
'SMSC': self.smsc
|
|
}
|
|
|
|
def InterfacesAndProperties(self):
|
|
return {mm1.SMS_INTERFACE: self.Properties()}
|
|
|
|
|
|
class PseudoNetworkInterface(object):
|
|
"""A Pseudo network interface.
|
|
|
|
This uses a pair of network interfaces and dnsmasq to simulate the
|
|
network device normally associated with a modem.
|
|
"""
|
|
|
|
# Any interface that shill manages will get its own routing
|
|
# table. Routes added to the main routing table with RTPROT_BOOT (the
|
|
# default proto value) will be sent to the corresponding interface's
|
|
# routing table. We want to prevent that in this case, so we use
|
|
# proto 5, as shill currently ignores proto values greater than 4.
|
|
ROUTE_PROTO = 'proto 5'
|
|
|
|
def __init__(self, interface, base):
|
|
self.interface = interface
|
|
self.peer = self.interface + 'p'
|
|
self.base = base
|
|
self.lease_file = '/tmp/dnsmasq.%s.leases' % self.interface
|
|
self.dnsmasq = None
|
|
|
|
def __enter__(self):
|
|
"""Make usable with "with" statement."""
|
|
self.CreateInterface()
|
|
return self
|
|
|
|
def __exit__(self, exception, value, traceback):
|
|
"""Make usable with "with" statement."""
|
|
self.DestroyInterface()
|
|
return False
|
|
|
|
def CreateInterface(self):
|
|
"""Creates a virtual interface.
|
|
|
|
Creates the virtual interface self.interface as well as a peer
|
|
interface. Runs dnsmasq on the peer interface so that a DHCP
|
|
service can offer ip addresses to the virtual interface.
|
|
"""
|
|
os.system('ip link add name %s type veth peer name %s' % (
|
|
self.interface, self.peer))
|
|
|
|
os.system('ifconfig %s %s.1/24' % (self.peer, self.base))
|
|
os.system('ifconfig %s up' % self.peer)
|
|
|
|
os.system('ifconfig %s up' % self.interface)
|
|
os.system('ip route add 255.255.255.255 dev %s %s' %
|
|
(self.peer, self.ROUTE_PROTO))
|
|
os.close(os.open(self.lease_file, os.O_CREAT | os.O_TRUNC))
|
|
self.dnsmasq = subprocess.Popen(
|
|
['/usr/local/sbin/dnsmasq',
|
|
'--pid-file',
|
|
'-k',
|
|
'--dhcp-leasefile=%s' % self.lease_file,
|
|
'--dhcp-range=%s.2,%s.254' % (self.base, self.base),
|
|
'--port=0',
|
|
'--interface=%s' % self.peer,
|
|
'--bind-interfaces'
|
|
])
|
|
# iptables default policy is to reject packets. Add ACCEPT as the
|
|
# target for the virtual and peer interfaces. Note that this currently
|
|
# only accepts v4 traffic.
|
|
os.system('iptables -I INPUT -i %s -j ACCEPT' % self.peer)
|
|
os.system('iptables -I INPUT -i %s -j ACCEPT' % self.interface)
|
|
|
|
def DestroyInterface(self):
|
|
"""Destroys the virtual interface.
|
|
|
|
Stops dnsmasq and cleans up all on disk state.
|
|
"""
|
|
if self.dnsmasq:
|
|
self.dnsmasq.terminate()
|
|
try:
|
|
os.system('ip route del 255.255.255.255 %s' % self.ROUTE_PROTO)
|
|
except:
|
|
pass
|
|
try:
|
|
os.system('ip link del %s' % self.interface)
|
|
except:
|
|
pass
|
|
os.system('iptables -D INPUT -i %s -j ACCEPT' % self.peer)
|
|
os.system('iptables -D INPUT -i %s -j ACCEPT' % self.interface)
|
|
if os.path.exists(self.lease_file):
|
|
os.remove(self.lease_file)
|
|
|
|
|
|
class Modem(DBusObjectWithProperties):
|
|
"""A Modem object that implements the ModemManager DBUS API."""
|
|
|
|
def __init__(self, manager, name='/Modem/0',
|
|
device='pseudomodem0',
|
|
mdn='0000001234',
|
|
meid='A100000DCE2CA0',
|
|
carrier='CrCarrier',
|
|
esn='EDD1EDD1',
|
|
sim=None):
|
|
"""Instantiates a Modem with some options.
|
|
|
|
Args:
|
|
manager: a ModemManager object.
|
|
name: string, a dbus path name.
|
|
device: string, the network device to use.
|
|
mdn: string, the mobile directory number.
|
|
meid: string, the mobile equipment id (CDMA only?).
|
|
carrier: string, the name of the carrier.
|
|
esn: string, the electronic serial number.
|
|
sim: a SIM object.
|
|
"""
|
|
self.state = mm1.MM_MODEM_STATE_DISABLED
|
|
self.manager = manager
|
|
self.name = name
|
|
self.path = manager.path + name
|
|
self.device = device
|
|
self.mdn = mdn
|
|
self.meid = meid
|
|
self.carrier = carrier
|
|
self.operator_name = carrier
|
|
self.operator_code = '123'
|
|
self.esn = esn
|
|
self.registration_state = mm1.MM_MODEM_3GPP_REGISTRATION_STATE_IDLE
|
|
self.sim = sim
|
|
DBusObjectWithProperties.__init__(self, manager.bus, self.path)
|
|
self.pseudo_interface = PseudoNetworkInterface(self.device, '192.168.7')
|
|
self.smses = {}
|
|
|
|
def __enter__(self):
|
|
"""Make usable with "with" statement."""
|
|
self.pseudo_interface.__enter__()
|
|
# Add the device to the manager only after the pseudo
|
|
# interface has been created.
|
|
self.manager.Add(self)
|
|
return self
|
|
|
|
def __exit__(self, exception, value, traceback):
|
|
"""Make usable with "with" statement."""
|
|
self.manager.Remove(self)
|
|
return self.pseudo_interface.__exit__(exception, value, traceback)
|
|
|
|
def DiscardModem(self):
|
|
"""Discard this DBUS Object.
|
|
|
|
Send a message that a modem has disappeared and deregister from DBUS.
|
|
"""
|
|
logging.info('DiscardModem')
|
|
self.remove_from_connection()
|
|
self.manager.Remove(self)
|
|
|
|
def ModemProperties(self):
|
|
"""Return the properties of the modem object."""
|
|
properties = {
|
|
# 'Sim': type='o'
|
|
'ModemCapabilities': UInt32(0),
|
|
'CurrentCapabilities': UInt32(0),
|
|
'MaxBearers': UInt32(2),
|
|
'MaxActiveBearers': UInt32(2),
|
|
'Manufacturer': 'Foo Electronics',
|
|
'Model': 'Super Foo Modem',
|
|
'Revision': '1.0',
|
|
'DeviceIdentifier': '123456789',
|
|
'Device': self.device,
|
|
'Driver': 'fake',
|
|
'Plugin': 'Foo Plugin',
|
|
'EquipmentIdentifier': self.meid,
|
|
'UnlockRequired': UInt32(0),
|
|
#'UnlockRetries' type='a{uu}'
|
|
mm1.MM_MODEM_PROPERTY_STATE: Int32(self.state),
|
|
'AccessTechnologies': UInt32(self.state),
|
|
'SignalQuality': Struct([UInt32(90), True], signature='ub'),
|
|
'OwnNumbers': ['6175551212'],
|
|
'SupportedModes': UInt32(0),
|
|
'AllowedModes': UInt32(0),
|
|
'PreferredMode': UInt32(0),
|
|
'SupportedBands': [UInt32(0)],
|
|
'Bands': [UInt32(0)]
|
|
}
|
|
if self.sim:
|
|
properties['Sim'] = ObjectPath(self.sim.path)
|
|
return properties
|
|
|
|
def InterfacesAndProperties(self):
|
|
"""Return all supported interfaces and their properties."""
|
|
return {
|
|
mm1.MODEM_INTERFACE: self.ModemProperties(),
|
|
}
|
|
|
|
def ChangeState(self, new_state,
|
|
why=mm1.MM_MODEM_STATE_CHANGE_REASON_UNKNOWN):
|
|
logging.info('Change state from %s to %s', self.state, new_state)
|
|
self.StateChanged(Int32(self.state), Int32(new_state), UInt32(why))
|
|
self.PropertiesChanged(mm1.MODEM_INTERFACE,
|
|
{mm1.MM_MODEM_PROPERTY_STATE: Int32(new_state)},
|
|
[])
|
|
self.state = new_state
|
|
|
|
@dbus.service.method(mm1.MODEM_INTERFACE,
|
|
in_signature='b', out_signature='')
|
|
def Enable(self, on, *args, **kwargs):
|
|
"""Enables the Modem."""
|
|
logging.info('Modem: Enable %s', str(on))
|
|
if on:
|
|
if self.state <= mm1.MM_MODEM_STATE_ENABLING:
|
|
self.ChangeState(mm1.MM_MODEM_STATE_ENABLING)
|
|
if self.state <= mm1.MM_MODEM_STATE_ENABLED:
|
|
self.ChangeState(mm1.MM_MODEM_STATE_ENABLED)
|
|
if self.state <= mm1.MM_MODEM_STATE_SEARCHING:
|
|
self.ChangeState(mm1.MM_MODEM_STATE_SEARCHING)
|
|
glib.timeout_add(250, self.OnRegistered)
|
|
else:
|
|
if self.state >= mm1.MM_MODEM_STATE_DISABLING:
|
|
self.ChangeState(mm1.MM_MODEM_STATE_DISABLING)
|
|
if self.state >= mm1.MM_MODEM_STATE_DISABLED:
|
|
self.ChangeState(mm1.MM_MODEM_STATE_DISABLED)
|
|
self.ChangeRegistrationState(
|
|
mm1.MM_MODEM_3GPP_REGISTRATION_STATE_IDLE)
|
|
return None
|
|
|
|
def ChangeRegistrationState(self, new_state):
|
|
"""Updates the registration state of the modem.
|
|
|
|
Updates the registration state of the modem and broadcasts a
|
|
DBUS signal.
|
|
|
|
Args:
|
|
new_state: the new registation state of the modem.
|
|
"""
|
|
if new_state != self.registration_state:
|
|
self.registration_state = new_state
|
|
self.PropertiesChanged(
|
|
mm1.MODEM_MODEM3GPP_INTERFACE,
|
|
{mm1.MM_MODEM3GPP_PROPERTY_REGISTRATION_STATE:
|
|
UInt32(new_state)},
|
|
[])
|
|
|
|
def OnRegistered(self):
|
|
"""Called when the Modem is Registered."""
|
|
if (self.state >= mm1.MM_MODEM_STATE_ENABLED and
|
|
self.state <= mm1.MM_MODEM_STATE_REGISTERED):
|
|
logging.info('Modem: Marking Registered')
|
|
self.ChangeRegistrationState(
|
|
mm1.MM_MODEM_3GPP_REGISTRATION_STATE_HOME)
|
|
self.ChangeState(mm1.MM_MODEM_STATE_REGISTERED)
|
|
|
|
@dbus.service.method(mm1.MODEM_SIMPLE_INTERFACE, in_signature='',
|
|
out_signature='a{sv}')
|
|
def GetStatus(self, *args, **kwargs):
|
|
"""Gets the general modem status.
|
|
|
|
Returns:
|
|
A dictionary of properties.
|
|
"""
|
|
logging.info('Modem: GetStatus')
|
|
properties = {
|
|
'state': UInt32(self.state),
|
|
'signal-quality': UInt32(99),
|
|
'bands': self.carrier,
|
|
'access-technology': UInt32(0),
|
|
'm3gpp-registration-state': UInt32(self.registration_state),
|
|
'm3gpp-operator-code': '123',
|
|
'm3gpp-operator-name': '123',
|
|
'cdma-cdma1x-registration-state': UInt32(99),
|
|
'cdma-evdo-registration-state': UInt32(99),
|
|
'cdma-sid': '123',
|
|
'cdma-nid': '123',
|
|
}
|
|
if self.state >= mm1.MM_MODEM_STATE_ENABLED:
|
|
properties['carrier'] = 'Test Network'
|
|
return properties
|
|
|
|
@dbus.service.signal(mm1.MODEM_INTERFACE, signature='iiu')
|
|
def StateChanged(self, old_state, new_state, why):
|
|
pass
|
|
|
|
@dbus.service.method(mm1.MODEM_SIMPLE_INTERFACE, in_signature='a{sv}',
|
|
out_signature='o',
|
|
async_callbacks=('return_cb', 'raise_cb'))
|
|
def Connect(self, unused_props, return_cb, raise_cb, **kwargs):
|
|
"""Connect the modem to the network.
|
|
|
|
Args:
|
|
unused_props: connection properties. See ModemManager documentation.
|
|
return_cb: function to call to return result asynchronously.
|
|
raise_cb: function to call to raise an error asynchronously.
|
|
"""
|
|
|
|
def ConnectDone(new, why):
|
|
logging.info('Modem: ConnectDone %s -> %s because %s',
|
|
str(self.state), str(new), str(why))
|
|
if self.state == mm1.MM_MODEM_STATE_CONNECTING:
|
|
self.ChangeState(new, why)
|
|
# TODO(jglasgow): implement a bearer object
|
|
bearer_path = '/Bearer/0'
|
|
return_cb(bearer_path)
|
|
else:
|
|
raise_cb(mm1.ConnectionUnknownError())
|
|
|
|
logging.info('Modem: Connect')
|
|
if self.state != mm1.MM_MODEM_STATE_REGISTERED:
|
|
logging.info(
|
|
'Modem: Connect fails on unregistered modem. State = %s',
|
|
self.state)
|
|
raise mm1.NoNetworkError()
|
|
delay_ms = kwargs.get('connect_delay_ms', DEFAULT_CONNECT_DELAY_MS)
|
|
time.sleep(delay_ms / 1000.0)
|
|
self.ChangeState(mm1.MM_MODEM_STATE_CONNECTING)
|
|
glib.timeout_add(50, lambda: ConnectDone(
|
|
mm1.MM_MODEM_STATE_CONNECTED,
|
|
mm1.MM_MODEM_STATE_CHANGE_REASON_USER_REQUESTED))
|
|
|
|
@dbus.service.method(mm1.MODEM_SIMPLE_INTERFACE, in_signature='o',
|
|
async_callbacks=('return_cb', 'raise_cb'))
|
|
def Disconnect(self, bearer, return_cb, raise_cb, **kwargs):
|
|
"""Disconnect the modem from the network."""
|
|
|
|
def DisconnectDone(old, new, why):
|
|
logging.info('Modem: DisconnectDone %s -> %s because %s',
|
|
str(old), str(new), str(why))
|
|
if self.state == mm1.MM_MODEM_STATE_DISCONNECTING:
|
|
logging.info('Modem: State is DISCONNECTING, changing to %s',
|
|
str(new))
|
|
self.ChangeState(new)
|
|
return_cb()
|
|
elif self.state == mm1.MM_MODEM_STATE_DISABLED:
|
|
logging.info('Modem: State is DISABLED, not changing state')
|
|
return_cb()
|
|
else:
|
|
raise_cb(mm1.ConnectionUnknownError())
|
|
|
|
logging.info('Modem: Disconnect')
|
|
self.ChangeState(mm1.MM_MODEM_STATE_DISCONNECTING)
|
|
glib.timeout_add(
|
|
500,
|
|
lambda: DisconnectDone(
|
|
self.state,
|
|
mm1.MM_MODEM_STATE_REGISTERED,
|
|
mm1.MM_MODEM_STATE_CHANGE_REASON_USER_REQUESTED))
|
|
|
|
@dbus.service.signal(dbus.PROPERTIES_IFACE, signature='sa{sv}as')
|
|
def PropertiesChanged(self, interface, changed_properties,
|
|
invalidated_properties):
|
|
pass
|
|
|
|
def AddSMS(self, sms):
|
|
logging.info('Adding SMS %s to list', sms.path)
|
|
self.smses[sms.path] = sms
|
|
self.Added(self.path, True)
|
|
|
|
@dbus.service.method(mm1.MODEM_MESSAGING_INTERFACE, in_signature='',
|
|
out_signature='ao')
|
|
def List(self, *args, **kwargs):
|
|
logging.info('Modem.Messaging: List: %s',
|
|
', '.join(self.smses.keys()))
|
|
return self.smses.keys()
|
|
|
|
@dbus.service.method(mm1.MODEM_MESSAGING_INTERFACE, in_signature='o',
|
|
out_signature='')
|
|
def Delete(self, sms_path, *args, **kwargs):
|
|
logging.info('Modem.Messaging: Delete %s', sms_path)
|
|
del self.smses[sms_path]
|
|
|
|
@dbus.service.signal(mm1.MODEM_MESSAGING_INTERFACE, signature='ob')
|
|
def Added(self, sms_path, complete):
|
|
pass
|
|
|
|
|
|
class GSMModem(Modem):
|
|
"""A GSMModem implements the mm1.MODEM_MODEM3GPP_INTERFACE interface."""
|
|
|
|
def __init__(self, manager, imei='00112342342', **kwargs):
|
|
self.imei = imei
|
|
Modem.__init__(self, manager, **kwargs)
|
|
|
|
@dbus.service.method(mm1.MODEM_MODEM3GPP_INTERFACE,
|
|
in_signature='s', out_signature='')
|
|
def Register(self, operator_id, *args, **kwargs):
|
|
"""Register the modem on the network."""
|
|
pass
|
|
|
|
def Modem3GPPProperties(self):
|
|
"""Return the 3GPP Properties of the modem object."""
|
|
return {
|
|
'Imei': self.imei,
|
|
mm1.MM_MODEM3GPP_PROPERTY_REGISTRATION_STATE:
|
|
UInt32(self.registration_state),
|
|
'OperatorCode': self.operator_code,
|
|
'OperatorName': self.operator_name,
|
|
'EnabledFacilityLocks': UInt32(0)
|
|
}
|
|
|
|
def InterfacesAndProperties(self):
|
|
"""Return all supported interfaces and their properties."""
|
|
return {
|
|
mm1.MODEM_INTERFACE: self.ModemProperties(),
|
|
mm1.MODEM_MODEM3GPP_INTERFACE: self.Modem3GPPProperties()
|
|
}
|
|
|
|
@dbus.service.method(mm1.MODEM_MODEM3GPP_INTERFACE, in_signature='',
|
|
out_signature='aa{sv}')
|
|
def Scan(self, *args, **kwargs):
|
|
"""Scan for networks."""
|
|
raise mm1.CoreUnsupportedError()
|
|
|
|
|
|
class ModemManager(dbus.service.Object):
|
|
"""Implements the org.freedesktop.DBus.ObjectManager interface."""
|
|
|
|
def __init__(self, bus, path):
|
|
self.devices = []
|
|
self.bus = bus
|
|
self.path = path
|
|
dbus.service.Object.__init__(self, bus, path)
|
|
|
|
def Add(self, device):
|
|
"""Adds a modem device to the list of devices that are managed."""
|
|
logging.info('ModemManager: add %s', device.name)
|
|
self.devices.append(device)
|
|
interfaces = device.InterfacesAndProperties()
|
|
logging.info('Add: %s', interfaces)
|
|
self.InterfacesAdded(device.path, interfaces)
|
|
|
|
def Remove(self, device):
|
|
"""Removes a modem device from the list of managed devices."""
|
|
logging.info('ModemManager: remove %s', device.name)
|
|
self.devices.remove(device)
|
|
interfaces = device.InterfacesAndProperties().keys()
|
|
self.InterfacesRemoved(device.path, interfaces)
|
|
|
|
@dbus.service.method(mm1.OFDOM, out_signature='a{oa{sa{sv}}}')
|
|
def GetManagedObjects(self):
|
|
"""Returns the list of managed objects and their properties."""
|
|
results = {}
|
|
for device in self.devices:
|
|
results[device.path] = device.InterfacesAndProperties()
|
|
logging.info('GetManagedObjects: %s', ', '.join(results.keys()))
|
|
return results
|
|
|
|
@dbus.service.signal(mm1.OFDOM, signature='oa{sa{sv}}')
|
|
def InterfacesAdded(self, object_path, interfaces_and_properties):
|
|
pass
|
|
|
|
@dbus.service.signal(mm1.OFDOM, signature='oas')
|
|
def InterfacesRemoved(self, object_path, interfaces):
|
|
pass
|
|
|
|
|
|
def main():
|
|
usage = """
|
|
Run pseudo_modem to simulate a GSM modem using the modemmanager-next
|
|
DBUS interfaces. This can be used for the following:
|
|
- to simpilify the verification process of UI features that use of
|
|
overseas SIM cards
|
|
- to test shill on a virtual machine without a physical modem
|
|
- to test that Chrome property displays SMS messages
|
|
|
|
To use on a test image you use test_that to run
|
|
network_3GModemControl which will cause pseudo_modem.py to be
|
|
installed in /usr/local/autotests/cros/cellular. Then stop
|
|
modemmanager and start the pseudo modem with the commands:
|
|
|
|
stop modemmanager
|
|
/usr/local/autotest/cros/cellular/pseudo_modem.py
|
|
|
|
When done, use Control-C to stop the process and restart modem manager:
|
|
start modemmanager
|
|
|
|
Additional help documentation is available by invoking pseudo_modem.py
|
|
--help.
|
|
|
|
SMS testing can be accomnplished by supplying the -s flag to simulate
|
|
the receipt of a number of SMS messages. The message text can be
|
|
specified with the --text option on the command line, or read from a
|
|
file by using the --file option. If the messages are located in a
|
|
file, then each line corresponds to a single SMS message.
|
|
|
|
Chrome should display the SMS messages as soon as a user logs in to
|
|
the Chromebook, or if the user is already logged in, then shortly
|
|
after the pseudo modem is recognized by shill.
|
|
"""
|
|
parser = OptionParser(usage=usage)
|
|
parser.add_option('-c', '--carrier', dest='carrier_name',
|
|
metavar='<carrier name>',
|
|
help='<carrier name> := %s' % ' | '.join(
|
|
SIM.CARRIERS.keys()))
|
|
parser.add_option('-s', '--smscount', dest='sms_count',
|
|
default=0,
|
|
metavar='<smscount>',
|
|
help='<smscount> := integer')
|
|
parser.add_option('-l', '--logfile', dest='logfile',
|
|
default='',
|
|
metavar='<filename>',
|
|
help='<filename> := filename for logging output')
|
|
parser.add_option('-t', '--text', dest='sms_text',
|
|
default=None,
|
|
metavar='<text>',
|
|
help='<text> := text for sms messages')
|
|
parser.add_option('-f', '--file', dest='filename',
|
|
default=None,
|
|
metavar='<filename>',
|
|
help='<filename> := file with text for sms messages')
|
|
|
|
(options, args) = parser.parse_args()
|
|
|
|
kwargs = {}
|
|
if options.logfile:
|
|
kwargs['filename'] = options.logfile
|
|
logging.basicConfig(format='%(asctime)-15s %(message)s',
|
|
level=logging.DEBUG,
|
|
**kwargs)
|
|
|
|
if not options.carrier_name:
|
|
options.carrier_name = DEFAULT_CARRIER
|
|
|
|
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
|
|
bus = dbus.SystemBus()
|
|
name = dbus.service.BusName(mm1.MODEM_MANAGER_INTERFACE, bus)
|
|
manager = ModemManager(bus, mm1.OMM)
|
|
sim_card = SIM.FromCarrier(string.lower(options.carrier_name),
|
|
manager)
|
|
with GSMModem(manager, sim=sim_card) as modem:
|
|
if options.filename:
|
|
f = open(options.filename, 'r')
|
|
for index, line in enumerate(f.readlines()):
|
|
line = line.strip()
|
|
if line:
|
|
sms = SMS(manager, name='/SMS/%s' % index, text=line)
|
|
modem.AddSMS(sms)
|
|
else:
|
|
for index in xrange(int(options.sms_count)):
|
|
sms = SMS(manager, name='/SMS/%s' % index,
|
|
text=options.sms_text)
|
|
modem.AddSMS(sms)
|
|
|
|
mainloop = gobject.MainLoop()
|
|
|
|
def SignalHandler(signum, frame):
|
|
logging.info('Signal handler called with signal: %s', signum)
|
|
mainloop.quit()
|
|
|
|
signal.signal(signal.SIGTERM, SignalHandler)
|
|
|
|
mainloop.run()
|
|
|
|
if __name__ == '__main__':
|
|
main()
|